Skip to content

Commit

Permalink
Flaky test fix. Query only 1 broker to test quota split (apache#13771)
Browse files Browse the repository at this point in the history
  • Loading branch information
shounakmk219 authored Sep 6, 2024
1 parent 50ad070 commit 6e8333a
Showing 1 changed file with 70 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,18 @@
*/
package org.apache.pinot.integration.tests;

import com.fasterxml.jackson.databind.JsonNode;
import java.io.IOException;
import java.net.URI;
import java.util.Iterator;
import java.util.Properties;
import org.apache.pinot.broker.broker.helix.BaseBrokerStarter;
import org.apache.pinot.broker.queryquota.HelixExternalViewBasedQueryQuotaManagerTest;
import org.apache.pinot.client.BrokerResponse;
import org.apache.pinot.client.ConnectionFactory;
import org.apache.pinot.client.JsonAsyncHttpPinotClientTransportFactory;
import org.apache.pinot.client.PinotClientException;
import org.apache.pinot.client.PinotClientTransport;
import org.apache.pinot.client.ResultSetGroup;
import org.apache.pinot.common.utils.http.HttpClient;
import org.apache.pinot.spi.config.table.QuotaConfig;
Expand All @@ -46,6 +51,9 @@
* tested as part of {@link HelixExternalViewBasedQueryQuotaManagerTest}
*/
public class QueryQuotaClusterIntegrationTest extends BaseClusterIntegrationTest {
private PinotClientTransport _pinotClientTransport;
private String _brokerHostPort;

@BeforeClass
public void setUp()
throws Exception {
Expand All @@ -56,6 +64,7 @@ public void setUp()
startController();
startBrokers(1);
startServers(1);
_brokerHostPort = LOCAL_HOST + ":" + _brokerPorts.get(0);

// Create and upload the schema and table config
Schema schema = createSchema();
Expand All @@ -65,9 +74,11 @@ public void setUp()

Properties properties = new Properties();
properties.put(FAIL_ON_EXCEPTIONS, "FALSE");
_pinotClientTransport = new JsonAsyncHttpPinotClientTransportFactory()
.withConnectionProperties(getPinotConnectionProperties())
.buildTransport();
_pinotConnection = ConnectionFactory.fromZookeeper(properties, getZkUrl() + "/" + getHelixClusterName(),
new JsonAsyncHttpPinotClientTransportFactory().withConnectionProperties(getPinotConnectionProperties())
.buildTransport());
_pinotClientTransport);
}

@AfterMethod
Expand All @@ -76,6 +87,8 @@ void resetQuotas()
addQueryQuotaToClusterConfig(null);
addQueryQuotaToDatabaseConfig(null);
addQueryQuotaToTableConfig(null);
_brokerHostPort = LOCAL_HOST + ":" + _brokerPorts.get(0);
verifyQuotaUpdate(0);
}

@Test
Expand Down Expand Up @@ -125,12 +138,13 @@ public void testDatabaseQueryQuotaWithTableQueryQuotaWithExtraBroker()
addQueryQuotaToTableConfig(10);
// Add one more broker such that quota gets distributed equally among them
brokerStarter = startOneBroker(2);
// to allow change propagation to QueryQuotaManager
Thread.sleep(1000);
testQueryRate(10);
_brokerHostPort = LOCAL_HOST + ":" + brokerStarter.getPort();
// query only one broker across the divided quota
testQueryRateOnBroker(5);
// drop table level quota so that database quota comes into effect
addQueryQuotaToTableConfig(null);
testQueryRate(25);
// query only one broker across the divided quota
testQueryRateOnBroker(12.5f);
} finally {
if (brokerStarter != null) {
brokerStarter.stop();
Expand All @@ -143,19 +157,29 @@ public void testDatabaseQueryQuotaWithTableQueryQuotaWithExtraBroker()
* Then runs the query load with double the max rate and expects queries to fail due to quota breach.
* @param maxRate max rate allowed by the quota
*/
void testQueryRate(int maxRate)
void testQueryRate(float maxRate)
throws Exception {
verifyQuotaUpdate(maxRate);
runQueries(maxRate, false);
//increase the qps and some of the queries should be throttled.
runQueries(maxRate * 2, true);
}

void testQueryRateOnBroker(float maxRate)
throws Exception {
verifyQuotaUpdate(maxRate);
runQueriesOnBroker(maxRate, false);
//increase the qps and some of the queries should be throttled.
runQueriesOnBroker(maxRate * 2, true);
}

// try to keep the qps below 50 to ensure that the time lost between 2 query runs on top of the sleepMillis
// is not comparable to sleepMillis, else the actual qps would end up being much lower than required qps
private void runQueries(double qps, boolean shouldFail)
throws Exception {
int failCount = 0;
long sleepMillis = (long) (1000 / qps);
Thread.sleep(sleepMillis);
for (int i = 0; i < qps * 2; i++) {
ResultSetGroup resultSetGroup = _pinotConnection.execute("SELECT COUNT(*) FROM " + getTableName());
for (PinotClientException exception : resultSetGroup.getExceptions()) {
Expand All @@ -169,14 +193,52 @@ private void runQueries(double qps, boolean shouldFail)
assertTrue((failCount == 0 && !shouldFail) || (failCount != 0 && shouldFail));
}

private void verifyQuotaUpdate(float quotaQps) {
TestUtils.waitForCondition(aVoid -> {
try {
float tableQuota = Float.parseFloat(sendGetRequest(String.format("http://%s/debug/tables/queryQuota/%s_OFFLINE",
_brokerHostPort, getTableName())));
tableQuota = tableQuota == 0 ? Long.MAX_VALUE : tableQuota;
float dbQuota = Float.parseFloat(sendGetRequest(String.format("http://%s/debug/databases/queryQuota/default",
_brokerHostPort)));
dbQuota = dbQuota == 0 ? Long.MAX_VALUE : dbQuota;
return quotaQps == Math.min(tableQuota, dbQuota)
|| (quotaQps == 0 && tableQuota == Long.MAX_VALUE && dbQuota == Long.MAX_VALUE);
} catch (IOException e) {
throw new RuntimeException(e);
}
}, 5000, "Failed to reflect query quota on rate limiter in 5s");
}

private BrokerResponse executeQueryOnBroker(String query) {
return _pinotClientTransport.executeQuery(_brokerHostPort, query);
}

private void runQueriesOnBroker(double qps, boolean shouldFail)
throws Exception {
int failCount = 0;
long sleepMillis = (long) (1000 / qps);
Thread.sleep(sleepMillis);
for (int i = 0; i < qps * 2; i++) {
BrokerResponse resultSetGroup = executeQueryOnBroker("SELECT COUNT(*) FROM " + getTableName());
for (Iterator<JsonNode> it = resultSetGroup.getExceptions().elements(); it.hasNext(); ) {
JsonNode exception = it.next();
if (exception.toPrettyString().contains("QuotaExceededError")) {
failCount++;
break;
}
}
Thread.sleep(sleepMillis);
}
assertTrue((failCount == 0 && !shouldFail) || (failCount != 0 && shouldFail));
}

public void addQueryQuotaToTableConfig(Integer maxQps)
throws Exception {
TableConfig tableConfig = getOfflineTableConfig();
tableConfig.setQuotaConfig(new QuotaConfig(null, maxQps == null ? null : maxQps.toString()));
updateTableConfig(tableConfig);
// to allow change propagation to QueryQuotaManager
Thread.sleep(1000);
}

public void addQueryQuotaToDatabaseConfig(Integer maxQps)
Expand All @@ -187,7 +249,6 @@ public void addQueryQuotaToDatabaseConfig(Integer maxQps)
}
HttpClient.wrapAndThrowHttpException(_httpClient.sendPostRequest(new URI(url), null, null));
// to allow change propagation to QueryQuotaManager
Thread.sleep(1000);
}

public void addQueryQuotaToClusterConfig(Integer maxQps)
Expand All @@ -202,6 +263,5 @@ public void addQueryQuotaToClusterConfig(Integer maxQps)
_httpClient.sendJsonPostRequest(new URI(_controllerRequestURLBuilder.forClusterConfigs()), payload));
}
// to allow change propagation to QueryQuotaManager
Thread.sleep(1000);
}
}

0 comments on commit 6e8333a

Please sign in to comment.