Skip to content

Commit

Permalink
IGNITE-22964 Java thin: fix client init hang on unreachable discovere…
Browse files Browse the repository at this point in the history
…d address (#11486)

* Do not perform cluster discovery synchronously while initializing the client - do `applyOnDefaultChannel` before checking `channelsCnt.get() == 0` in `channelsInit`
* Do not disconnect active channels when performing discovery, even if those addresses are not in the new list to avoid unnecessary reconnects
  • Loading branch information
ptupitsyn authored Aug 21, 2024
1 parent 12cb12c commit add61eb
Show file tree
Hide file tree
Showing 6 changed files with 124 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -631,6 +631,19 @@ synchronized void initChannelHolders() {
return;
}

// Add connected channels to the list to avoid unnecessary reconnects, unless address finder is used.
if (holders != null && clientCfg.getAddressesFinder() == null) {
// Do not modify the original list.
newAddrs = new ArrayList<>(newAddrs);

for (ClientChannelHolder h : holders) {
ClientChannel ch = h.ch;

if (ch != null && !ch.closed())
newAddrs.add(h.getAddresses());
}
}

Map<InetSocketAddress, ClientChannelHolder> curAddrs = new HashMap<>();

Set<InetSocketAddress> newAddrsSet = newAddrs.stream().flatMap(Collection::stream).collect(Collectors.toSet());
Expand Down Expand Up @@ -744,13 +757,16 @@ void channelsInit(@Nullable List<ClientConnectionException> failures) {
initChannelHolders();

if (failures == null || failures.size() < attemptsLimit) {
// Establish default channel connection.
applyOnDefaultChannel(channel -> null, null, failures);

if (channelsCnt.get() == 0) {
// Establish default channel connection and retrive nodes endpoints if applicable.
if (applyOnDefaultChannel(discoveryCtx::refresh, null, failures))
boolean discoveryUpdated = applyOnDefaultChannel(discoveryCtx::refresh, null, failures);

if (discoveryUpdated)
initChannelHolders();
}
else // Apply no-op function. Establish default channel connection.
applyOnDefaultChannel(channel -> null, null, failures);
}

if (partitionAwarenessEnabled)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -205,9 +205,11 @@ public void testSingleServerFailover() throws Exception {
// Fail.
dropAllThinClientConnections(Ignition.allGrids().get(0));

Throwable ex = GridTestUtils.assertThrowsWithCause(() -> cachePut(cache, 0, 0), ClientConnectionException.class);
if (!partitionAware) {
Throwable ex = GridTestUtils.assertThrowsWithCause(() -> cachePut(cache, 0, 0), ClientConnectionException.class);

GridTestUtils.assertContains(null, ex.getMessage(), F.first(cluster.clientAddresses()));
GridTestUtils.assertContains(null, ex.getMessage(), F.first(cluster.clientAddresses()));
}

// Recover after fail.
cachePut(cache, 0, 0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -420,13 +420,13 @@ public void testExecuteTaskConnectionLost() throws Exception {
Future<Object> fut1 = compute.executeAsync(TestLatchTask.class.getName(), null);

// Wait for the task to start, then drop connections.
TestLatchTask.startLatch.await();
assertTrue(TestLatchTask.startLatch.await(TIMEOUT, TimeUnit.MILLISECONDS));
dropAllThinClientConnections();

TestLatchTask.startLatch = new CountDownLatch(1);
Future<Object> fut2 = compute.executeAsync(TestLatchTask.class.getName(), null);

TestLatchTask.startLatch.await();
assertTrue(TestLatchTask.startLatch.await(TIMEOUT, TimeUnit.MILLISECONDS));
dropAllThinClientConnections();

TestLatchTask.latch = new CountDownLatch(1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,10 @@

package org.apache.ignite.internal.client.thin;

import java.net.InetSocketAddress;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Queue;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
Expand Down Expand Up @@ -54,7 +56,7 @@
@SuppressWarnings("rawtypes")
public abstract class ThinClientAbstractPartitionAwarenessTest extends GridCommonAbstractTest {
/** Wait timeout. */
protected static final long WAIT_TIMEOUT = 5_000L;
protected static final long WAIT_TIMEOUT = 15_000L;

/** Replicated cache name. */
protected static final String REPL_CACHE_NAME = "replicated_cache";
Expand Down Expand Up @@ -153,16 +155,31 @@ public abstract class ThinClientAbstractPartitionAwarenessTest extends GridCommo
* Checks that operation goes through specified channel.
*/
protected void assertOpOnChannel(@Nullable TestTcpClientChannel expCh, ClientOperation expOp) {
assertOpOnChannel(expCh, expOp, null);
}

/**
* Checks that operation goes through specified channel.
*/
protected void assertOpOnChannel(
@Nullable TestTcpClientChannel expCh,
ClientOperation expOp,
@Nullable ClientOperation ignoreOp) {
while (opsQueue.peek() != null && opsQueue.peek().get2() == ignoreOp) {
opsQueue.poll();
}

T2<TestTcpClientChannel, ClientOperation> nextChOp = opsQueue.poll();
T2<TestTcpClientChannel, ClientOperation> queuedOp = opsQueue.peek();

assertNotNull("Unexpected (null) next operation [expCh=" + expCh + ", expOp=" + expOp + ']', nextChOp);

assertEquals("Unexpected operation on channel [expCh=" + expCh + ", expOp=" + expOp +
", nextOpCh=" + nextChOp + ']', expOp, nextChOp.get2());
", nextOpCh=" + nextChOp + ", queuedOp=" + queuedOp + ']', expOp, nextChOp.get2());

if (expCh != null) {
assertEquals("Unexpected channel for operation [expCh=" + expCh + ", expOp=" + expOp +
", nextOpCh=" + nextChOp + ']', expCh, nextChOp.get1());
", nextOpCh=" + nextChOp + ", queuedOp=" + queuedOp + ']', expCh, nextChOp.get1());
}
}

Expand Down Expand Up @@ -245,10 +262,41 @@ protected void awaitChannelsInit(int... chIdxs) throws IgniteInterruptedCheckedE
// Wait until all channels initialized.
for (int ch : chIdxs) {
assertTrue("Failed to wait for channel[" + ch + "] init",
GridTestUtils.waitForCondition(() -> channels[ch] != null, WAIT_TIMEOUT));
GridTestUtils.waitForCondition(() -> isConnected(ch), WAIT_TIMEOUT));
}
}

/**
* Gets a value indicating whether the channel is connected at the specified index (port offset).
*
* @param chIdx Channel index (port offset).
* @return {@code true} if the channel is connected, {@code false} otherwise.
*/
protected boolean isConnected(int chIdx) {
List<ReliableChannel.ClientChannelHolder> channelHolders = ((TcpIgniteClient)client).reliableChannel().getChannelHolders();
int chPort = DFLT_PORT + chIdx;

for (ReliableChannel.ClientChannelHolder holder : channelHolders) {
if (holder == null || holder.isClosed()) {
continue;
}

ClientChannel ch = GridTestUtils.getFieldValue(holder, "ch");

if (ch == null || ch.closed()) {
continue;
}

for (InetSocketAddress addr : holder.getAddresses()) {
if (addr.getPort() == chPort) {
return true;
}
}
}

return false;
}

/**
*
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,24 @@

package org.apache.ignite.internal.client.thin;

import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Map;

import org.apache.ignite.Ignition;
import org.apache.ignite.client.ClientConnectionException;
import org.apache.ignite.client.IgniteClient;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.configuration.ClientConfiguration;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.processors.odbc.ClientListenerProcessor;
import org.apache.ignite.testframework.GridTestUtils;
import org.junit.Test;

import static org.apache.ignite.configuration.ClientConnectorConfiguration.DFLT_PORT;

/**
* Test endpoints discovery by thin client.
*/
Expand Down Expand Up @@ -122,4 +136,35 @@ public void testDiscoveryAfterAllNodesFailed() throws Exception {

awaitChannelsInit(0);
}

/** */
@Test
public void testUnreachableAddressDiscoveredDoesNotPreventClientInit() throws Exception {
try (ServerSocket sock = new ServerSocket()) {
sock.bind(new InetSocketAddress("127.0.0.1", 0));

ArrayList<String> addrs = new ArrayList<>();
addrs.add("127.0.0.1:" + sock.getLocalPort());

IgniteEx server = startGrid(0);
ClusterNode serverNode = server.cluster().localNode();

// Override node attributes - set local port of the "fake server" socket which does not work.
Map<String, Object> attrsFiltered = serverNode.attributes();
Map<String, Object> attrsSealed = GridTestUtils.getFieldValue(attrsFiltered, "map");
Map<String, Object> attrs = GridTestUtils.getFieldValue(attrsSealed, "m");
attrs.put(ClientListenerProcessor.CLIENT_LISTENER_PORT, sock.getLocalPort());

// Config has good server address, client discovery returns unreachable address.
// We expect the client to connect to the good address and ignore the unreachable one.
ClientConfiguration ccfg = new ClientConfiguration()
.setTimeout(2000)
.setAddresses("127.0.0.1:" + DFLT_PORT);

IgniteClient client = Ignition.startClient(ccfg);

Collection<String> cacheNames = client.cacheNames();
assertFalse(cacheNames.isEmpty());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -205,12 +205,12 @@ public void testConnectionLoss() throws Exception {
cache.put(key, 0);

// Request goes to the connected channel, since affinity node is disconnected.
assertOpOnChannel(channels[1], ClientOperation.CACHE_PUT);
assertOpOnChannel(null, ClientOperation.CACHE_PUT);

cache.put(key, 0);

// Connection to disconnected node should be restored after retry.
assertOpOnChannel(channels[disconnectNodeIdx], ClientOperation.CACHE_PUT);
assertOpOnChannel(channels[disconnectNodeIdx], ClientOperation.CACHE_PUT, ClientOperation.CACHE_PARTITIONS);

// Test partition awareness.
testPartitionAwareness(false);
Expand Down

0 comments on commit add61eb

Please sign in to comment.