Skip to content

Commit

Permalink
Perform code refactor
Browse files Browse the repository at this point in the history
Signed-off-by: Sumit Bansal <sumitsb@amazon.com>
  • Loading branch information
sumitasr committed Sep 3, 2024
1 parent 27b68c5 commit d80be09
Show file tree
Hide file tree
Showing 2 changed files with 103 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
* <p>
* Set specific setting to for setting the threshold of throttling of particular task type.
* e.g : Set "cluster_manager.throttling.thresholds.put_mapping" to set throttling limit of "put mapping" tasks,
* Set it to default value(-1) to disable the throttling for this task type.
* Set it to default value(-1) to disable the throttling for this task type.
*/
public class ClusterManagerTaskThrottler implements TaskBatcherListener {
private static final Logger logger = LogManager.getLogger(ClusterManagerTaskThrottler.class);
Expand Down Expand Up @@ -69,7 +69,7 @@ public class ClusterManagerTaskThrottler implements TaskBatcherListener {
private final int MIN_THRESHOLD_VALUE = -1; // Disabled throttling
private final ClusterManagerTaskThrottlerListener clusterManagerTaskThrottlerListener;

private final ConcurrentMap<String, Long> tasksCount;
final ConcurrentMap<String, Long> tasksCount;
private final ConcurrentMap<String, Long> tasksThreshold;
private final Supplier<Version> minNodeVersionSupplier;

Expand Down Expand Up @@ -210,23 +210,14 @@ Long getThrottlingLimit(final String taskKey) {
}

private void checkForClusterManagerThrottling(
final ThrottlingKey clusterManagerThrottlingKey,
final String taskThrottlingKey,
final boolean throttlingEnabledWithThreshold,
final Long threshold,
final long taskCount,
final int tasksSize
final int tasksSize,
final String taskThrottlingKey
) {
if (clusterManagerThrottlingKey.isThrottlingEnabled()) {
Long threshold = tasksThreshold.get(taskThrottlingKey);
if (threshold != null && shouldThrottle(threshold, taskCount, tasksSize)) {
clusterManagerTaskThrottlerListener.onThrottle(taskThrottlingKey, tasksSize);
logger.warn(
"Throwing Throttling Exception for [{}]. Trying to add [{}] tasks to queue, limit is set to [{}]",
taskThrottlingKey,
tasksSize,
threshold
);
throw new ClusterManagerThrottlingException("Throttling Exception : Limit exceeded for " + taskThrottlingKey);
}
if (throttlingEnabledWithThreshold && shouldThrottle(threshold, taskCount, tasksSize)) {
throw new ClusterManagerThrottlingException("Throttling Exception : Limit exceeded for " + taskThrottlingKey);
}
}

Expand All @@ -235,16 +226,33 @@ public void onBeginSubmit(List<? extends TaskBatcher.BatchedTask> tasks) {
final ThrottlingKey clusterManagerThrottlingKey = ((ClusterStateTaskExecutor<Object>) tasks.get(0).batchingKey)
.getClusterManagerThrottlingKey();
final String taskThrottlingKey = clusterManagerThrottlingKey.getTaskThrottlingKey();
final Long threshold = getThrottlingLimit(taskThrottlingKey);
final boolean isThrottlingEnabledWithThreshold = clusterManagerThrottlingKey.isThrottlingEnabled() && threshold != null;
tasksCount.putIfAbsent(taskThrottlingKey, 0L);

// Performing shallow check before taking lock, performing throttle check and computing new count
checkForClusterManagerThrottling(clusterManagerThrottlingKey, taskThrottlingKey, tasksCount.get(taskThrottlingKey), tasks.size());

tasksCount.computeIfPresent(taskThrottlingKey, (key, count) -> {
int size = tasks.size();
checkForClusterManagerThrottling(clusterManagerThrottlingKey, taskThrottlingKey, count, size);
return count + size;
});
int tasksSize = tasks.size();

try {
checkForClusterManagerThrottling(
isThrottlingEnabledWithThreshold,
threshold,
tasksCount.get(taskThrottlingKey),
tasksSize,
taskThrottlingKey
);
tasksCount.computeIfPresent(taskThrottlingKey, (key, count) -> {
checkForClusterManagerThrottling(isThrottlingEnabledWithThreshold, threshold, count, tasksSize, taskThrottlingKey);
return count + tasksSize;
});
} catch (final ClusterManagerThrottlingException e) {
clusterManagerTaskThrottlerListener.onThrottle(taskThrottlingKey, tasksSize);
logger.trace(
"Throwing Throttling Exception for [{}]. Trying to add [{}] tasks to queue, limit is set to [{}]",
taskThrottlingKey,
tasksSize,
threshold
);
throw e;
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import static org.opensearch.test.ClusterServiceUtils.setState;
Expand Down Expand Up @@ -69,7 +71,7 @@ public static void afterClass() {
public void testDefaults() {
ClusterSettings clusterSettings = new ClusterSettings(Settings.builder().build(), ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
ClusterManagerTaskThrottler throttler = new ClusterManagerTaskThrottler(Settings.EMPTY, clusterSettings, () -> {
return clusterService.getMasterService().getMinNodeVersion();
return clusterService.getClusterManagerService().getMinNodeVersion();
}, new ClusterManagerThrottlingStats());
throttler.registerClusterManagerTask("put-mapping", true);
throttler.registerClusterManagerTask("create-index", true);
Expand Down Expand Up @@ -108,7 +110,7 @@ public void testValidateSettingsForDifferentVersion() {
}
}

public void testValidateSettingsForTaskWihtoutRetryOnDataNode() {
public void testValidateSettingsForTaskWithoutRetryOnDataNode() {
DiscoveryNode clusterManagerNode = getClusterManagerNode(Version.V_2_5_0);
DiscoveryNode dataNode = getDataNode(Version.V_2_5_0);
setState(
Expand Down Expand Up @@ -139,7 +141,7 @@ public void testUpdateSettingsForNullValue() {

ClusterSettings clusterSettings = new ClusterSettings(Settings.builder().build(), ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
ClusterManagerTaskThrottler throttler = new ClusterManagerTaskThrottler(Settings.EMPTY, clusterSettings, () -> {
return clusterService.getMasterService().getMinNodeVersion();
return clusterService.getClusterManagerService().getMinNodeVersion();
}, new ClusterManagerThrottlingStats());
throttler.registerClusterManagerTask("put-mapping", true);

Expand Down Expand Up @@ -173,7 +175,7 @@ public void testSettingsOnBootstrap() {
.put("cluster_manager.throttling.retry.max.delay", maxDelay + "s")
.build();
ClusterManagerTaskThrottler throttler = new ClusterManagerTaskThrottler(initialSettings, clusterSettings, () -> {
return clusterService.getMasterService().getMinNodeVersion();
return clusterService.getClusterManagerService().getMinNodeVersion();
}, new ClusterManagerThrottlingStats());
throttler.registerClusterManagerTask("put-mapping", true);

Expand All @@ -187,7 +189,7 @@ public void testSettingsOnBootstrap() {
public void testUpdateRetryDelaySetting() {
ClusterSettings clusterSettings = new ClusterSettings(Settings.builder().build(), ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
ClusterManagerTaskThrottler throttler = new ClusterManagerTaskThrottler(Settings.EMPTY, clusterSettings, () -> {
return clusterService.getMasterService().getMinNodeVersion();
return clusterService.getClusterManagerService().getMinNodeVersion();
}, new ClusterManagerThrottlingStats());

// verify defaults
Expand Down Expand Up @@ -217,7 +219,7 @@ public void testValidateSettingsForUnknownTask() {

ClusterSettings clusterSettings = new ClusterSettings(Settings.builder().build(), ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
ClusterManagerTaskThrottler throttler = new ClusterManagerTaskThrottler(Settings.EMPTY, clusterSettings, () -> {
return clusterService.getMasterService().getMinNodeVersion();
return clusterService.getClusterManagerService().getMinNodeVersion();
}, new ClusterManagerThrottlingStats());

// set some limit for update snapshot tasks
Expand All @@ -236,7 +238,7 @@ public void testUpdateThrottlingLimitForBasicSanity() {

ClusterSettings clusterSettings = new ClusterSettings(Settings.builder().build(), ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
ClusterManagerTaskThrottler throttler = new ClusterManagerTaskThrottler(Settings.EMPTY, clusterSettings, () -> {
return clusterService.getMasterService().getMinNodeVersion();
return clusterService.getClusterManagerService().getMinNodeVersion();
}, new ClusterManagerThrottlingStats());
throttler.registerClusterManagerTask("put-mapping", true);

Expand All @@ -263,7 +265,7 @@ public void testValidateSettingForLimit() {

ClusterSettings clusterSettings = new ClusterSettings(Settings.builder().build(), ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
ClusterManagerTaskThrottler throttler = new ClusterManagerTaskThrottler(Settings.EMPTY, clusterSettings, () -> {
return clusterService.getMasterService().getMinNodeVersion();
return clusterService.getClusterManagerService().getMinNodeVersion();
}, new ClusterManagerThrottlingStats());
throttler.registerClusterManagerTask("put-mapping", true);

Expand All @@ -274,7 +276,7 @@ public void testValidateSettingForLimit() {
public void testUpdateLimit() {
ClusterSettings clusterSettings = new ClusterSettings(Settings.builder().build(), ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
ClusterManagerTaskThrottler throttler = new ClusterManagerTaskThrottler(Settings.EMPTY, clusterSettings, () -> {
return clusterService.getMasterService().getMinNodeVersion();
return clusterService.getClusterManagerService().getMinNodeVersion();
}, new ClusterManagerThrottlingStats());
throttler.registerClusterManagerTask("put-mapping", true);

Expand Down Expand Up @@ -309,7 +311,7 @@ public void testThrottlingForDisabledThrottlingTask() {
String taskKey = "test";
ClusterSettings clusterSettings = new ClusterSettings(Settings.builder().build(), ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
ClusterManagerTaskThrottler throttler = new ClusterManagerTaskThrottler(Settings.EMPTY, clusterSettings, () -> {
return clusterService.getMasterService().getMinNodeVersion();
return clusterService.getClusterManagerService().getMinNodeVersion();
}, throttlingStats);
ClusterManagerTaskThrottler.ThrottlingKey throttlingKey = throttler.registerClusterManagerTask(taskKey, false);

Expand All @@ -321,6 +323,9 @@ public void testThrottlingForDisabledThrottlingTask() {

// Asserting that there was not any throttling for it
assertEquals(0L, throttlingStats.getThrottlingCount(taskKey));

// Asserting value in tasksCount map to make sure it gets updated even when throttling is disabled
assertEquals(Optional.of(10L).get(), throttler.tasksCount.get(taskKey));
}

public void testThrottlingForInitialStaticSettingAndVersionCheck() {
Expand All @@ -339,7 +344,7 @@ public void testThrottlingForInitialStaticSettingAndVersionCheck() {
.put("cluster_manager.throttling.thresholds.put-mapping.value", put_mapping_threshold_value)
.build();
ClusterManagerTaskThrottler throttler = new ClusterManagerTaskThrottler(initialSettings, clusterSettings, () -> {
return clusterService.getMasterService().getMinNodeVersion();
return clusterService.getClusterManagerService().getMinNodeVersion();
}, throttlingStats);
ClusterManagerTaskThrottler.ThrottlingKey throttlingKey = throttler.registerClusterManagerTask("put-mapping", true);

Expand Down Expand Up @@ -367,7 +372,7 @@ public void testThrottling() {
String taskKey = "test";
ClusterSettings clusterSettings = new ClusterSettings(Settings.builder().build(), ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
ClusterManagerTaskThrottler throttler = new ClusterManagerTaskThrottler(Settings.EMPTY, clusterSettings, () -> {
return clusterService.getMasterService().getMinNodeVersion();
return clusterService.getClusterManagerService().getMinNodeVersion();
}, throttlingStats);
ClusterManagerTaskThrottler.ThrottlingKey throttlingKey = throttler.registerClusterManagerTask(taskKey, true);

Expand Down Expand Up @@ -406,6 +411,58 @@ public void testThrottling() {
throttler.onBeginSubmit(getMockUpdateTaskList(taskKey, throttlingKey, 1));
}

public void testThrottlingWithLock() {
ClusterManagerThrottlingStats throttlingStats = new ClusterManagerThrottlingStats();
String taskKey = "test";
ClusterSettings clusterSettings = new ClusterSettings(Settings.builder().build(), ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
ClusterManagerTaskThrottler throttler = new ClusterManagerTaskThrottler(Settings.EMPTY, clusterSettings, () -> {
return clusterService.getClusterManagerService().getMinNodeVersion();
}, throttlingStats);
ClusterManagerTaskThrottler.ThrottlingKey throttlingKey = throttler.registerClusterManagerTask(taskKey, true);

throttler.updateLimit(taskKey, 5);

// adding 3 tasks
throttler.onBeginSubmit(getMockUpdateTaskList(taskKey, throttlingKey, 3));

// adding 3 more tasks, these tasks should be throttled
// taskCount in Queue: 3 Threshold: 5
assertThrows(
ClusterManagerThrottlingException.class,
() -> throttler.onBeginSubmit(getMockUpdateTaskList(taskKey, throttlingKey, 3))
);
assertEquals(3L, throttlingStats.getThrottlingCount(taskKey));

// remove one task
throttler.onBeginProcessing(getMockUpdateTaskList(taskKey, throttlingKey, 1));

// add 3 tasks should pass now.
// taskCount in Queue: 2 Threshold: 5
throttler.onBeginSubmit(getMockUpdateTaskList(taskKey, throttlingKey, 3));

final CountDownLatch latch = new CountDownLatch(1);

// Taking lock on tasksCount will not impact throttling behaviour now.
new Thread(() -> {
throttler.tasksCount.computeIfPresent(taskKey, (key, count) -> {
try {
latch.await();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return 10L;
});
}).start();

// adding one task will throttle
// taskCount in Queue: 5 Threshold: 5
assertThrows(
ClusterManagerThrottlingException.class,
() -> throttler.onBeginSubmit(getMockUpdateTaskList(taskKey, throttlingKey, 1))
);
latch.countDown();
}

private List<TaskBatcherTests.TestTaskBatcher.UpdateTask> getMockUpdateTaskList(
String taskKey,
ClusterManagerTaskThrottler.ThrottlingKey throttlingKey,
Expand Down

0 comments on commit d80be09

Please sign in to comment.