diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/ClientConnectorConfiguration.java b/modules/core/src/main/java/org/apache/ignite/configuration/ClientConnectorConfiguration.java index 178165900b15c..8a993bcc5304d 100644 --- a/modules/core/src/main/java/org/apache/ignite/configuration/ClientConnectorConfiguration.java +++ b/modules/core/src/main/java/org/apache/ignite/configuration/ClientConnectorConfiguration.java @@ -19,6 +19,7 @@ import javax.cache.configuration.Factory; import javax.net.ssl.SSLContext; +import org.apache.ignite.internal.util.typedef.internal.A; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.ssl.SslContextFactory; import org.jetbrains.annotations.Nullable; @@ -57,6 +58,9 @@ public class ClientConnectorConfiguration { /** Default value of whether to use Ignite SSL context factory. */ public static final boolean DFLT_USE_IGNITE_SSL_CTX_FACTORY = true; + /** Default session outbound queue limit. */ + public static final int DFLT_SESSIONS_MESSAGE_QUEUE_LIMIT = 0; + /** Host. */ private String host; @@ -114,6 +118,14 @@ public class ClientConnectorConfiguration { /** Thin-client specific configuration. */ private ThinClientConfiguration thinCliCfg = new ThinClientConfiguration(); + /** + * Client session outbound message queue limit. Limits the number of messages waiting to be sent from the + * server side to particular Client. If the specified limit is exceeded, corresponding Client connection + * will be closed. + * The value {@code 0} means that no limit is applied to the Client outbound message queue. + */ + private int sesOutboundMsgQueueLimit = DFLT_SESSIONS_MESSAGE_QUEUE_LIMIT; + /** * Creates SQL connector configuration with all default values. */ @@ -147,6 +159,7 @@ public ClientConnectorConfiguration(ClientConnectorConfiguration cfg) { useIgniteSslCtxFactory = cfg.isUseIgniteSslContextFactory(); sslCtxFactory = cfg.getSslContextFactory(); thinCliCfg = new ThinClientConfiguration(cfg.getThinClientConfiguration()); + sesOutboundMsgQueueLimit = cfg.getSessionOutboundMessageQueueLimit(); } /** @@ -592,6 +605,28 @@ public ClientConnectorConfiguration setThinClientConfiguration(ThinClientConfigu return this; } + /** @return Session outbound message queue limit. */ + public int getSessionOutboundMessageQueueLimit() { + return sesOutboundMsgQueueLimit; + } + + /** + * Sets Client session outbound message queue limit. Limits the number of messages waiting to be sent from the + * server side to particular client. If the specified limit is exceeded, corresponding Client connection + * will be closed. + * The value {@code 0} means that no limit is applied to the Client outbound message queue. + * + * @param sesOutboundMsgQueueLimit Session outbound queue limit. + * @return {@code this} for chaining. + */ + public ClientConnectorConfiguration setSessionOutboundMessageQueueLimit(int sesOutboundMsgQueueLimit) { + A.ensure(sesOutboundMsgQueueLimit >= 0, "Session outbound queue limit must be greater than or equal to zero."); + + this.sesOutboundMsgQueueLimit = sesOutboundMsgQueueLimit; + + return this; + } + /** {@inheritDoc} */ @Override public String toString() { return S.toString(ClientConnectorConfiguration.class, this); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/ClientListenerProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/ClientListenerProcessor.java index 3a502135905f9..897a2919706fb 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/ClientListenerProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/ClientListenerProcessor.java @@ -56,6 +56,7 @@ import org.apache.ignite.internal.util.nio.ssl.GridNioSslFilter; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lang.IgniteBiInClosure; import org.apache.ignite.mxbean.ClientProcessorMXBean; import org.apache.ignite.plugin.security.SecurityPermission; import org.apache.ignite.spi.IgnitePortProtocol; @@ -116,6 +117,9 @@ public class ClientListenerProcessor extends GridProcessorAdapter { /** Thin client distributed configuration. */ private DistributedThinClientConfiguration distrThinCfg; + /** Client connector configuration. */ + private ClientConnectorConfiguration cliConnCfg; + /** * @param ctx Kernal context. */ @@ -127,7 +131,7 @@ public ClientListenerProcessor(GridKernalContext ctx) { @Override public void start() throws IgniteCheckedException { IgniteConfiguration cfg = ctx.config(); - ClientConnectorConfiguration cliConnCfg = prepareConfiguration(cfg); + cliConnCfg = prepareConfiguration(cfg); if (cliConnCfg != null) { try { @@ -167,6 +171,11 @@ public ClientListenerProcessor(GridKernalContext ctx) { metrics = new ClientListenerMetrics(mreg); + IgniteBiInClosure msgQueueSizeLsnr = + cliConnCfg.getSessionOutboundMessageQueueLimit() > 0 + ? this::onOutboundMessageOffered + : null; + for (int port = cliConnCfg.getPort(); port <= portTo && port <= 65535; port++) { try { srv = GridNioServer.builder() @@ -186,6 +195,7 @@ public ClientListenerProcessor(GridKernalContext ctx) { .directMode(true) .idleTimeout(idleTimeout > 0 ? idleTimeout : Long.MAX_VALUE) .metricRegistry(mreg) + .messageQueueSizeListener(msgQueueSizeLsnr) .build(); ctx.ports().registerPort(port, IgnitePortProtocol.TCP, getClass()); @@ -671,6 +681,22 @@ public ClientProcessorMXBean mxBean() { return new ClientProcessorMXBeanImpl(); } + /** */ + private void onOutboundMessageOffered(GridNioSession ses, int queueSize) { + if (queueSize < cliConnCfg.getSessionOutboundMessageQueueLimit()) + return; + + srv.close(ses).listen(fut -> { + if (fut.error() == null && fut.result()) { + U.quietAndWarn(log, "Ignite Thin Client outbound message queue size is exceeded" + + " 'SessionOutboundMessageQueueLimit', it will be disconnected" + + " [locNodeId=" + ctx.localNodeId() + + ", clientAddress=" + ses.remoteAddress() + + ", sessionOutboundMessageQueueLimit=" + cliConnCfg.getSessionOutboundMessageQueueLimit() + ']'); + } + }); + } + /** * ClientProcessorMXBean interface. */ diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/odbc/ClientSessionOutboundQueueLimitTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/odbc/ClientSessionOutboundQueueLimitTest.java new file mode 100644 index 0000000000000..c687b488633ce --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/odbc/ClientSessionOutboundQueueLimitTest.java @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.odbc; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.ignite.Ignition; +import org.apache.ignite.client.ClientCache; +import org.apache.ignite.client.IgniteClient; +import org.apache.ignite.client.events.ConnectionClosedEvent; +import org.apache.ignite.client.events.ConnectionEventListener; +import org.apache.ignite.configuration.ClientConfiguration; +import org.apache.ignite.configuration.ClientConnectorConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.IgniteEx; +import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.testframework.GridTestUtils; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import org.junit.Test; + +import static org.apache.ignite.testframework.GridTestUtils.setFieldValue; + +/** */ +public class ClientSessionOutboundQueueLimitTest extends GridCommonAbstractTest { + /** */ + public static final int MSG_QUEUE_LIMIT = 100; + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { + return super.getConfiguration(igniteInstanceName) + .setClientConnectorConfiguration(new ClientConnectorConfiguration() + .setSessionOutboundMessageQueueLimit(MSG_QUEUE_LIMIT)); + } + + /** + * Test scenario: + * 1. Thin client performs huge amount of async cache get requests. It does not matter if they belong to unique keys. + * 2. Server accepts all of them and prepares responses that are accumulated in the thin client session outbound + * message queue. + * 3. Programmatically, we limit the thin client's ability to receive messages from the server, causing the message + * queue to eventually become full. + * 4. Checks that thin client is disconnected from the cluster and all requests in progress are failed. + */ + @Test + public void testClientSessionOutboundQueueLimit() throws Exception { + startGrid(0); + + AtomicBoolean isCliDisconnected = new AtomicBoolean(false); + + try ( + IgniteClient cli = Ignition.startClient(new ClientConfiguration() + .setAddresses("127.0.0.1:10800") + .setEventListeners(new ConnectionEventListener() { + @Override public void onConnectionClosed(ConnectionClosedEvent event) { + isCliDisconnected.set(true); + } + })) + ) { + ClientCache cache = cli.getOrCreateCache(DEFAULT_CACHE_NAME); + + byte[] val = new byte[4096]; + + ThreadLocalRandom.current().nextBytes(val); + + cache.put(0, val); + + skipClientWrite(grid(0), true); + + Collection> futs = new ArrayList<>(); + + try { + while (!isCliDisconnected.get()) + futs.add(GridTestUtils.runAsync(() -> cache.get(0))); + } + finally { + skipClientWrite(grid(0), false); + } + + AtomicInteger failedReqsCntr = new AtomicInteger(); + + futs.forEach(fut -> { + try { + fut.get(); + } + catch (Exception e) { + assertTrue(e.getMessage().contains("Channel is closed")); + + failedReqsCntr.incrementAndGet(); + } + }); + + assertTrue(failedReqsCntr.get() >= MSG_QUEUE_LIMIT); + } + } + + /** */ + private void skipClientWrite(IgniteEx ignite, boolean skip) { + ClientListenerProcessor cliPrc = ignite.context().clientListener(); + + setFieldValue(U.field(cliPrc, "srv"), "skipWrite", skip); + } +} diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java index 496354ba38592..62c78b04e5d76 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java @@ -53,6 +53,7 @@ import org.apache.ignite.internal.processors.continuous.GridEventConsumeSelfTest; import org.apache.ignite.internal.processors.continuous.GridMessageListenSelfTest; import org.apache.ignite.internal.processors.odbc.ClientListenerMetricsTest; +import org.apache.ignite.internal.processors.odbc.ClientSessionOutboundQueueLimitTest; import org.apache.ignite.internal.processors.odbc.OdbcConfigurationValidationSelfTest; import org.apache.ignite.internal.processors.odbc.OdbcEscapeSequenceSelfTest; import org.apache.ignite.internal.processors.odbc.SqlListenerUtilsTest; @@ -138,7 +139,8 @@ OdbcConfigurationValidationSelfTest.class, OdbcEscapeSequenceSelfTest.class, SqlListenerUtilsTest.class, - JavaVersionCommandParserTest.class + JavaVersionCommandParserTest.class, + ClientSessionOutboundQueueLimitTest.class, }) public class IgniteBasicTestSuite { }