Skip to content

Commit

Permalink
IGNITE-20424 Added client session outbound message queue size limit (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
petrov-mg authored Oct 13, 2023
1 parent 1141f17 commit 0f0e465
Show file tree
Hide file tree
Showing 4 changed files with 186 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import javax.cache.configuration.Factory;
import javax.net.ssl.SSLContext;
import org.apache.ignite.internal.util.typedef.internal.A;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.ssl.SslContextFactory;
import org.jetbrains.annotations.Nullable;
Expand Down Expand Up @@ -57,6 +58,9 @@ public class ClientConnectorConfiguration {
/** Default value of whether to use Ignite SSL context factory. */
public static final boolean DFLT_USE_IGNITE_SSL_CTX_FACTORY = true;

/** Default session outbound queue limit. */
public static final int DFLT_SESSIONS_MESSAGE_QUEUE_LIMIT = 0;

/** Host. */
private String host;

Expand Down Expand Up @@ -114,6 +118,14 @@ public class ClientConnectorConfiguration {
/** Thin-client specific configuration. */
private ThinClientConfiguration thinCliCfg = new ThinClientConfiguration();

/**
* Client session outbound message queue limit. Limits the number of messages waiting to be sent from the
* server side to particular Client. If the specified limit is exceeded, corresponding Client connection
* will be closed.
* The value {@code 0} means that no limit is applied to the Client outbound message queue.
*/
private int sesOutboundMsgQueueLimit = DFLT_SESSIONS_MESSAGE_QUEUE_LIMIT;

/**
* Creates SQL connector configuration with all default values.
*/
Expand Down Expand Up @@ -147,6 +159,7 @@ public ClientConnectorConfiguration(ClientConnectorConfiguration cfg) {
useIgniteSslCtxFactory = cfg.isUseIgniteSslContextFactory();
sslCtxFactory = cfg.getSslContextFactory();
thinCliCfg = new ThinClientConfiguration(cfg.getThinClientConfiguration());
sesOutboundMsgQueueLimit = cfg.getSessionOutboundMessageQueueLimit();
}

/**
Expand Down Expand Up @@ -592,6 +605,28 @@ public ClientConnectorConfiguration setThinClientConfiguration(ThinClientConfigu
return this;
}

/** @return Session outbound message queue limit. */
public int getSessionOutboundMessageQueueLimit() {
return sesOutboundMsgQueueLimit;
}

/**
* Sets Client session outbound message queue limit. Limits the number of messages waiting to be sent from the
* server side to particular client. If the specified limit is exceeded, corresponding Client connection
* will be closed.
* The value {@code 0} means that no limit is applied to the Client outbound message queue.
*
* @param sesOutboundMsgQueueLimit Session outbound queue limit.
* @return {@code this} for chaining.
*/
public ClientConnectorConfiguration setSessionOutboundMessageQueueLimit(int sesOutboundMsgQueueLimit) {
A.ensure(sesOutboundMsgQueueLimit >= 0, "Session outbound queue limit must be greater than or equal to zero.");

this.sesOutboundMsgQueueLimit = sesOutboundMsgQueueLimit;

return this;
}

/** {@inheritDoc} */
@Override public String toString() {
return S.toString(ClientConnectorConfiguration.class, this);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import org.apache.ignite.internal.util.nio.ssl.GridNioSslFilter;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteBiInClosure;
import org.apache.ignite.mxbean.ClientProcessorMXBean;
import org.apache.ignite.plugin.security.SecurityPermission;
import org.apache.ignite.spi.IgnitePortProtocol;
Expand Down Expand Up @@ -116,6 +117,9 @@ public class ClientListenerProcessor extends GridProcessorAdapter {
/** Thin client distributed configuration. */
private DistributedThinClientConfiguration distrThinCfg;

/** Client connector configuration. */
private ClientConnectorConfiguration cliConnCfg;

/**
* @param ctx Kernal context.
*/
Expand All @@ -127,7 +131,7 @@ public ClientListenerProcessor(GridKernalContext ctx) {
@Override public void start() throws IgniteCheckedException {
IgniteConfiguration cfg = ctx.config();

ClientConnectorConfiguration cliConnCfg = prepareConfiguration(cfg);
cliConnCfg = prepareConfiguration(cfg);

if (cliConnCfg != null) {
try {
Expand Down Expand Up @@ -167,6 +171,11 @@ public ClientListenerProcessor(GridKernalContext ctx) {

metrics = new ClientListenerMetrics(mreg);

IgniteBiInClosure<GridNioSession, Integer> msgQueueSizeLsnr =
cliConnCfg.getSessionOutboundMessageQueueLimit() > 0
? this::onOutboundMessageOffered
: null;

for (int port = cliConnCfg.getPort(); port <= portTo && port <= 65535; port++) {
try {
srv = GridNioServer.<ClientMessage>builder()
Expand All @@ -186,6 +195,7 @@ public ClientListenerProcessor(GridKernalContext ctx) {
.directMode(true)
.idleTimeout(idleTimeout > 0 ? idleTimeout : Long.MAX_VALUE)
.metricRegistry(mreg)
.messageQueueSizeListener(msgQueueSizeLsnr)
.build();

ctx.ports().registerPort(port, IgnitePortProtocol.TCP, getClass());
Expand Down Expand Up @@ -671,6 +681,22 @@ public ClientProcessorMXBean mxBean() {
return new ClientProcessorMXBeanImpl();
}

/** */
private void onOutboundMessageOffered(GridNioSession ses, int queueSize) {
if (queueSize < cliConnCfg.getSessionOutboundMessageQueueLimit())
return;

srv.close(ses).listen(fut -> {
if (fut.error() == null && fut.result()) {
U.quietAndWarn(log, "Ignite Thin Client outbound message queue size is exceeded" +
" 'SessionOutboundMessageQueueLimit', it will be disconnected" +
" [locNodeId=" + ctx.localNodeId() +
", clientAddress=" + ses.remoteAddress() +
", sessionOutboundMessageQueueLimit=" + cliConnCfg.getSessionOutboundMessageQueueLimit() + ']');
}
});
}

/**
* ClientProcessorMXBean interface.
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.ignite.internal.processors.odbc;

import java.util.ArrayList;
import java.util.Collection;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.ignite.Ignition;
import org.apache.ignite.client.ClientCache;
import org.apache.ignite.client.IgniteClient;
import org.apache.ignite.client.events.ConnectionClosedEvent;
import org.apache.ignite.client.events.ConnectionEventListener;
import org.apache.ignite.configuration.ClientConfiguration;
import org.apache.ignite.configuration.ClientConnectorConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.junit.Test;

import static org.apache.ignite.testframework.GridTestUtils.setFieldValue;

/** */
public class ClientSessionOutboundQueueLimitTest extends GridCommonAbstractTest {
/** */
public static final int MSG_QUEUE_LIMIT = 100;

/** {@inheritDoc} */
@Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
return super.getConfiguration(igniteInstanceName)
.setClientConnectorConfiguration(new ClientConnectorConfiguration()
.setSessionOutboundMessageQueueLimit(MSG_QUEUE_LIMIT));
}

/**
* Test scenario:
* 1. Thin client performs huge amount of async cache get requests. It does not matter if they belong to unique keys.
* 2. Server accepts all of them and prepares responses that are accumulated in the thin client session outbound
* message queue.
* 3. Programmatically, we limit the thin client's ability to receive messages from the server, causing the message
* queue to eventually become full.
* 4. Checks that thin client is disconnected from the cluster and all requests in progress are failed.
*/
@Test
public void testClientSessionOutboundQueueLimit() throws Exception {
startGrid(0);

AtomicBoolean isCliDisconnected = new AtomicBoolean(false);

try (
IgniteClient cli = Ignition.startClient(new ClientConfiguration()
.setAddresses("127.0.0.1:10800")
.setEventListeners(new ConnectionEventListener() {
@Override public void onConnectionClosed(ConnectionClosedEvent event) {
isCliDisconnected.set(true);
}
}))
) {
ClientCache<Integer, byte[]> cache = cli.getOrCreateCache(DEFAULT_CACHE_NAME);

byte[] val = new byte[4096];

ThreadLocalRandom.current().nextBytes(val);

cache.put(0, val);

skipClientWrite(grid(0), true);

Collection<IgniteInternalFuture<byte[]>> futs = new ArrayList<>();

try {
while (!isCliDisconnected.get())
futs.add(GridTestUtils.runAsync(() -> cache.get(0)));
}
finally {
skipClientWrite(grid(0), false);
}

AtomicInteger failedReqsCntr = new AtomicInteger();

futs.forEach(fut -> {
try {
fut.get();
}
catch (Exception e) {
assertTrue(e.getMessage().contains("Channel is closed"));

failedReqsCntr.incrementAndGet();
}
});

assertTrue(failedReqsCntr.get() >= MSG_QUEUE_LIMIT);
}
}

/** */
private void skipClientWrite(IgniteEx ignite, boolean skip) {
ClientListenerProcessor cliPrc = ignite.context().clientListener();

setFieldValue(U.field(cliPrc, "srv"), "skipWrite", skip);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import org.apache.ignite.internal.processors.continuous.GridEventConsumeSelfTest;
import org.apache.ignite.internal.processors.continuous.GridMessageListenSelfTest;
import org.apache.ignite.internal.processors.odbc.ClientListenerMetricsTest;
import org.apache.ignite.internal.processors.odbc.ClientSessionOutboundQueueLimitTest;
import org.apache.ignite.internal.processors.odbc.OdbcConfigurationValidationSelfTest;
import org.apache.ignite.internal.processors.odbc.OdbcEscapeSequenceSelfTest;
import org.apache.ignite.internal.processors.odbc.SqlListenerUtilsTest;
Expand Down Expand Up @@ -138,7 +139,8 @@
OdbcConfigurationValidationSelfTest.class,
OdbcEscapeSequenceSelfTest.class,
SqlListenerUtilsTest.class,
JavaVersionCommandParserTest.class
JavaVersionCommandParserTest.class,
ClientSessionOutboundQueueLimitTest.class,
})
public class IgniteBasicTestSuite {
}

0 comments on commit 0f0e465

Please sign in to comment.