From 2ada27873e3db0e08f5021082fd926e6522880a1 Mon Sep 17 00:00:00 2001 From: Sumit Bansal Date: Thu, 29 Aug 2024 21:06:54 +0530 Subject: [PATCH] [ClusterManagerTaskThrottler Improvements] + Add shallow check in ClusterManagerTaskThrottler's onBeginSubmit method before computeIfPresent to avoid lock when queue is full + Remove stack trace filling in ClusterManagerThrottlingException Signed-off-by: Sumit Bansal --- CHANGELOG.md | 1 + .../service/ClusterManagerTaskThrottler.java | 48 ++++++++++++------- .../ClusterManagerThrottlingException.java | 6 +++ 3 files changed, 37 insertions(+), 18 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index f8b695205e789..a712daf08bd6a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -32,6 +32,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Add concurrent search support for Derived Fields ([#15326](https://github.com/opensearch-project/OpenSearch/pull/15326)) - [Workload Management] Add query group stats constructs ([#15343](https://github.com/opensearch-project/OpenSearch/pull/15343))) - Add runAs to Subject interface and introduce IdentityAwarePlugin extension point ([#14630](https://github.com/opensearch-project/OpenSearch/pull/14630)) +- ClusterManagerTaskThrottler Improvements ([#15508](https://github.com/opensearch-project/OpenSearch/pull/15508)) ### Dependencies - Bump `netty` from 4.1.111.Final to 4.1.112.Final ([#15081](https://github.com/opensearch-project/OpenSearch/pull/15081)) diff --git a/server/src/main/java/org/opensearch/cluster/service/ClusterManagerTaskThrottler.java b/server/src/main/java/org/opensearch/cluster/service/ClusterManagerTaskThrottler.java index 827f3a12fbce4..a507c62418994 100644 --- a/server/src/main/java/org/opensearch/cluster/service/ClusterManagerTaskThrottler.java +++ b/server/src/main/java/org/opensearch/cluster/service/ClusterManagerTaskThrottler.java @@ -209,28 +209,40 @@ Long getThrottlingLimit(final String taskKey) { return tasksThreshold.get(taskKey); } + private void checkForClusterManagerThrottling( + final ThrottlingKey clusterManagerThrottlingKey, + final String taskThrottlingKey, + final long taskCount, + final int tasksSize + ) { + if (clusterManagerThrottlingKey.isThrottlingEnabled()) { + Long threshold = tasksThreshold.get(taskThrottlingKey); + if (threshold != null && shouldThrottle(threshold, taskCount, tasksSize)) { + clusterManagerTaskThrottlerListener.onThrottle(taskThrottlingKey, tasksSize); + logger.warn( + "Throwing Throttling Exception for [{}]. Trying to add [{}] tasks to queue, limit is set to [{}]", + taskThrottlingKey, + tasksSize, + threshold + ); + throw new ClusterManagerThrottlingException("Throttling Exception : Limit exceeded for " + taskThrottlingKey); + } + } + } + @Override public void onBeginSubmit(List tasks) { - ThrottlingKey clusterManagerThrottlingKey = ((ClusterStateTaskExecutor) tasks.get(0).batchingKey) + final ThrottlingKey clusterManagerThrottlingKey = ((ClusterStateTaskExecutor) tasks.get(0).batchingKey) .getClusterManagerThrottlingKey(); - tasksCount.putIfAbsent(clusterManagerThrottlingKey.getTaskThrottlingKey(), 0L); - tasksCount.computeIfPresent(clusterManagerThrottlingKey.getTaskThrottlingKey(), (key, count) -> { + final String taskThrottlingKey = clusterManagerThrottlingKey.getTaskThrottlingKey(); + tasksCount.putIfAbsent(taskThrottlingKey, 0L); + + // Performing shallow check before taking lock, performing throttle check and computing new count + checkForClusterManagerThrottling(clusterManagerThrottlingKey, taskThrottlingKey, tasksCount.get(taskThrottlingKey), tasks.size()); + + tasksCount.computeIfPresent(taskThrottlingKey, (key, count) -> { int size = tasks.size(); - if (clusterManagerThrottlingKey.isThrottlingEnabled()) { - Long threshold = tasksThreshold.get(clusterManagerThrottlingKey.getTaskThrottlingKey()); - if (threshold != null && shouldThrottle(threshold, count, size)) { - clusterManagerTaskThrottlerListener.onThrottle(clusterManagerThrottlingKey.getTaskThrottlingKey(), size); - logger.warn( - "Throwing Throttling Exception for [{}]. Trying to add [{}] tasks to queue, limit is set to [{}]", - clusterManagerThrottlingKey.getTaskThrottlingKey(), - tasks.size(), - threshold - ); - throw new ClusterManagerThrottlingException( - "Throttling Exception : Limit exceeded for " + clusterManagerThrottlingKey.getTaskThrottlingKey() - ); - } - } + checkForClusterManagerThrottling(clusterManagerThrottlingKey, taskThrottlingKey, count, size); return count + size; }); } diff --git a/server/src/main/java/org/opensearch/cluster/service/ClusterManagerThrottlingException.java b/server/src/main/java/org/opensearch/cluster/service/ClusterManagerThrottlingException.java index 04fa9fa45d5ea..7a835910c400f 100644 --- a/server/src/main/java/org/opensearch/cluster/service/ClusterManagerThrottlingException.java +++ b/server/src/main/java/org/opensearch/cluster/service/ClusterManagerThrottlingException.java @@ -25,4 +25,10 @@ public ClusterManagerThrottlingException(String msg, Object... args) { public ClusterManagerThrottlingException(StreamInput in) throws IOException { super(in); } + + @Override + public Throwable fillInStackTrace() { + // This is on the hot path; stack traces are expensive to compute and not very useful for this exception, so don't fill it. + return this; + } }