Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BUG] Cluster Manager task throttler blocks network threads #13741

Open
Bukhtawar opened this issue May 18, 2024 · 10 comments
Open

[BUG] Cluster Manager task throttler blocks network threads #13741

Bukhtawar opened this issue May 18, 2024 · 10 comments
Assignees
Labels
bug Something isn't working Cluster Manager enhancement Enhancement or improvement to existing feature or request v2.17.0

Comments

@Bukhtawar
Copy link
Collaborator

Bukhtawar commented May 18, 2024

Describe the bug

"opensearch[b869f183befc74cff9f3b5572821ec21][transport_worker][T#7]" #49 daemon prio=5 os_prio=0 cpu=83095452.97ms elapsed=286686.74s tid=0x0000fffe34008180 nid=0x5638 waiting for monitor entry  [0x0000fffef0315000]
   java.lang.Thread.State: BLOCKED (on object monitor)
        at java.util.concurrent.ConcurrentHashMap.computeIfPresent(java.base@17.0.9/ConcurrentHashMap.java:1819)
        - waiting to lock <0x00000011e9000000> (a java.util.concurrent.ConcurrentHashMap$Node)
        at org.opensearch.cluster.service.ClusterManagerTaskThrottler.onBeginSubmit(ClusterManagerTaskThrottler.java:221)
        at org.opensearch.cluster.service.TaskBatcher.submitTasks(TaskBatcher.java:85)
        at org.opensearch.cluster.service.MasterService.submitStateUpdateTasks(MasterService.java:998)
        at org.opensearch.cluster.service.ClusterService.submitStateUpdateTasks(ClusterService.java:373)
        at org.opensearch.cluster.service.ClusterService.submitStateUpdateTask(ClusterService.java:351)
        at org.opensearch.snapshots.SnapshotsService.innerUpdateSnapshotState(SnapshotsService.java:3856)
        at org.opensearch.snapshots.SnapshotsService$UpdateSnapshotStatusAction.clusterManagerOperation(SnapshotsService.java:3951)
        at org.opensearch.snapshots.SnapshotsService$UpdateSnapshotStatusAction.clusterManagerOperation(SnapshotsService.java:3913)
        at org.opensearch.action.support.clustermanager.TransportClusterManagerNodeAction.masterOperation(TransportClusterManagerNodeAction.java:177)
        at org.opensearch.action.support.clustermanager.TransportClusterManagerNodeAction.clusterManagerOperation(TransportClusterManagerNodeAction.java:186)
        at org.opensearch.action.support.clustermanager.TransportClusterManagerNodeAction$AsyncSingleAction.lambda$doStart$1(TransportClusterManagerNodeAction.java:292)
        at org.opensearch.action.support.clustermanager.TransportClusterManagerNodeAction$AsyncSingleAction$$Lambda$6044/0x0000008801c64240.accept(Unknown Source)
        at org.opensearch.action.ActionRunnable$2.doRun(ActionRunnable.java:89)
        at org.opensearch.common.util.concurrent.AbstractRunnable.run(AbstractRunnable.java:52)
        at org.opensearch.common.util.concurrent.OpenSearchExecutors$DirectExecutorService.execute(OpenSearchExecutors.java:412)
        at org.opensearch.action.support.clustermanager.TransportClusterManagerNodeAction$AsyncSingleAction.doStart(TransportClusterManagerNodeAction.java:289)
        at org.opensearch.action.support.clustermanager.TransportClusterManagerNodeAction$AsyncSingleAction.tryAction(TransportClusterManagerNodeAction.java:239)
        at org.opensearch.action.support.RetryableAction$1.doRun(RetryableAction.java:139)
        at org.opensearch.common.util.concurrent.AbstractRunnable.run(AbstractRunnable.java:52)
        at org.opensearch.common.util.concurrent.OpenSearchExecutors$DirectExecutorService.execute(OpenSearchExecutors.java:412)
        at org.opensearch.action.support.RetryableAction.run(RetryableAction.java:117)
        at org.opensearch.action.support.clustermanager.TransportClusterManagerNodeAction.doExecute(TransportClusterManagerNodeAction.java:200)
        at org.opensearch.action.support.clustermanager.TransportClusterManagerNodeAction.doExecute(TransportClusterManagerNodeAction.java:88)
        at org.opensearch.action.support.TransportAction$RequestFilterChain.proceed(TransportAction.java:218)
        at org.opensearch.indexmanagement.controlcenter.notification.filter.IndexOperationActionFilter.apply(IndexOperationActionFilter.kt:39)
        at org.opensearch.action.support.TransportAction$RequestFilterChain.proceed(TransportAction.java:216)
        at org.opensearch.indexmanagement.rollup.actionfilter.FieldCapsFilter.apply(FieldCapsFilter.kt:118)
        at org.opensearch.action.support.TransportAction$RequestFilterChain.proceed(TransportAction.java:216)
        at org.opensearch.security.filter.SecurityFilter.apply0(SecurityFilter.java:294)
        at org.opensearch.security.filter.SecurityFilter.apply(SecurityFilter.java:165)
        at org.opensearch.action.support.TransportAction$RequestFilterChain.proceed(TransportAction.java:216)
        at org.opensearch.performanceanalyzer.action.PerformanceAnalyzerActionFilter.apply(PerformanceAnalyzerActionFilter.java:78)
        at org.opensearch.action.support.TransportAction$RequestFilterChain.proceed(TransportAction.java:216)
        at org.opensearch.action.support.TransportAction.execute(TransportAction.java:188)
        at org.opensearch.action.support.HandledTransportAction$TransportHandler.messageReceived(HandledTransportAction.java:133)
        at org.opensearch.action.support.HandledTransportAction$TransportHandler.messageReceived(HandledTransportAction.java:129)
        at org.opensearch.security.ssl.transport.SecuritySSLRequestHandler.messageReceivedDecorate(SecuritySSLRequestHandler.java:210)
        at org.opensearch.security.transport.SecurityRequestHandler.messageReceivedDecorate(SecurityRequestHandler.java:323)
        at org.opensearch.security.ssl.transport.SecuritySSLRequestHandler.messageReceived(SecuritySSLRequestHandler.java:158)
        at org.opensearch.security.OpenSearchSecurityPlugin$6$1.messageReceived(OpenSearchSecurityPlugin.java:852)
        at org.opensearch.indexmanagement.rollup.interceptor.RollupInterceptor$interceptHandler$1.messageReceived(RollupInterceptor.kt:114)
        at org.opensearch.performanceanalyzer.transport.PerformanceAnalyzerTransportRequestHandler.messageReceived(PerformanceAnalyzerTransportRequestHandler.java:43)
        at org.opensearch.transport.RequestHandlerRegistry.processMessageReceived(RequestHandlerRegistry.java:106)
        at org.opensearch.transport.InboundHandler.handleRequest(InboundHandler.java:271)
        at org.opensearch.transport.InboundHandler.messageReceived(InboundHandler.java:144)
        at org.opensearch.transport.InboundHandler.inboundMessage(InboundHandler.java:127)
        at org.opensearch.transport.TcpTransport.inboundMessage(TcpTransport.java:770)
        at org.opensearch.transport.netty4.Netty4MessageChannelHandler$$Lambda$5488/0x00000088013adaa0.accept(Unknown Source)
82.2% (8.2s out of 10s) cpu usage by thread 'opensearch[b869f183befc74cff9f3b5572821ec21][transport_worker][T#6]'
     6/10 snapshots sharing following 87 elements
       java.base@17.0.9/java.util.concurrent.ConcurrentHashMap.computeIfPresent(ConcurrentHashMap.java:1819)
       app//org.opensearch.cluster.service.ClusterManagerTaskThrottler.onBeginSubmit(ClusterManagerTaskThrottler.java:221)
       app//org.opensearch.cluster.service.TaskBatcher.submitTasks(TaskBatcher.java:85)
       app//org.opensearch.cluster.service.MasterService.submitStateUpdateTasks(MasterService.java:998)
       app//org.opensearch.cluster.service.ClusterService.submitStateUpdateTasks(ClusterService.java:373)
       app//org.opensearch.cluster.service.ClusterService.submitStateUpdateTask(ClusterService.java:351)
       app//org.opensearch.snapshots.SnapshotsService.innerUpdateSnapshotState(SnapshotsService.java:3856)
       app//org.opensearch.snapshots.SnapshotsService$UpdateSnapshotStatusAction.clusterManagerOperation(SnapshotsService.java:3951)
       app//org.opensearch.snapshots.SnapshotsService$UpdateSnapshotStatusAction.clusterManagerOperation(SnapshotsService.java:3913)
       app//org.opensearch.action.support.clustermanager.TransportClusterManagerNodeAction.masterOperation(TransportClusterManagerNodeAction.java:177)
       app//org.opensearch.action.support.clustermanager.TransportClusterManagerNodeAction.clusterManagerOperation(TransportClusterManagerNodeAction.java:186)
       app//org.opensearch.action.support.clustermanager.TransportClusterManagerNodeAction$AsyncSingleAction.lambda$doStart$1(TransportClusterManagerNodeAction.java:292)
       app//org.opensearch.action.support.clustermanager.TransportClusterManagerNodeAction$AsyncSingleAction$$Lambda$6044/0x0000008801c64240.accept(Unknown Source)
       app//org.opensearch.action.ActionRunnable$2.doRun(ActionRunnable.java:89)
       app//org.opensearch.common.util.concurrent.AbstractRunnable.run(AbstractRunnable.java:52)
       app//org.opensearch.common.util.concurrent.OpenSearchExecutors$DirectExecutorService.execute(OpenSearchExecutors.java:412)
       app//org.opensearch.action.support.clustermanager.TransportClusterManagerNodeAction$AsyncSingleAction.doStart(TransportClusterManagerNodeAction.java:289)
       app//org.opensearch.action.support.clustermanager.TransportClusterManagerNodeAction$AsyncSingleAction.tryAction(TransportClusterManagerNodeAction.java:239)
       app//org.opensearch.action.support.RetryableAction$1.doRun(RetryableAction.java:139)
       app//org.opensearch.common.util.concurrent.AbstractRunnable.run(AbstractRunnable.java:52)
       app//org.opensearch.common.util.concurrent.OpenSearchExecutors$DirectExecutorService.execute(OpenSearchExecutors.java:412)
       app//org.opensearch.action.support.RetryableAction.run(RetryableAction.java:117)
       app//org.opensearch.action.support.clustermanager.TransportClusterManagerNodeAction.doExecute(TransportClusterManagerNodeAction.java:200)
       app//org.opensearch.action.support.clustermanager.TransportClusterManagerNodeAction.doExecute(TransportClusterManagerNodeAction.java:88)
       app//org.opensearch.action.support.TransportAction$RequestFilterChain.proceed(TransportAction.java:218)
       org.opensearch.indexmanagement.controlcenter.notification.filter.IndexOperationActionFilter.apply(IndexOperationActionFilter.kt:39)
       app//org.opensearch.action.support.TransportAction$RequestFilterChain.proceed(TransportAction.java:216)
       org.opensearch.indexmanagement.rollup.actionfilter.FieldCapsFilter.apply(FieldCapsFilter.kt:118)
       app//org.opensearch.action.support.TransportAction$RequestFilterChain.proceed(TransportAction.java:216)
       org.opensearch.security.filter.SecurityFilter.apply0(SecurityFilter.java:294)
       org.opensearch.security.filter.SecurityFilter.apply(SecurityFilter.java:165)
       app//org.opensearch.action.support.TransportAction$RequestFilterChain.proceed(TransportAction.java:216)
       org.opensearch.performanceanalyzer.action.PerformanceAnalyzerActionFilter.apply(PerformanceAnalyzerActionFilter.java:78)
       app//org.opensearch.action.support.TransportAction$RequestFilterChain.proceed(TransportAction.java:216)
       app//org.opensearch.action.support.TransportAction.execute(TransportAction.java:188)
       app//org.opensearch.action.support.HandledTransportAction$TransportHandler.messageReceived(HandledTransportAction.java:133)
       app//org.opensearch.action.support.HandledTransportAction$TransportHandler.messageReceived(HandledTransportAction.java:129)
       org.opensearch.security.ssl.transport.SecuritySSLRequestHandler.messageReceivedDecorate(SecuritySSLRequestHandler.java:210)
       org.opensearch.security.transport.SecurityRequestHandler.messageReceivedDecorate(SecurityRequestHandler.java:323)
       org.opensearch.security.ssl.transport.SecuritySSLRequestHandler.messageReceived(SecuritySSLRequestHandler.java:158)
       org.opensearch.security.OpenSearchSecurityPlugin$6$1.messageReceived(OpenSearchSecurityPlugin.java:852)
       org.opensearch.indexmanagement.rollup.interceptor.RollupInterceptor$interceptHandler$1.messageReceived(RollupInterceptor.kt:114)
       org.opensearch.performanceanalyzer.transport.PerformanceAnalyzerTransportRequestHandler.messageReceived(PerformanceAnalyzerTransportRequestHandler.java:43)
       app//org.opensearch.transport.RequestHandlerRegistry.processMessageReceived(RequestHandlerRegistry.java:106)
       app//org.opensearch.transport.InboundHandler.handleRequest(InboundHandler.java:271)
"opensearch[b869f183befc74cff9f3b5572821ec21][transport_worker][T#4]" #46 daemon prio=5 os_prio=0 cpu=80430471.06ms elapsed=289343.25s tid=0x0000fffe7ee12660 nid=0x5635 runnable  [0x0000fffef0614000]
   java.lang.Thread.State: RUNNABLE
        at java.lang.Object.notifyAll(java.base@17.0.9/Native Method)
        at org.apache.logging.log4j.core.async.TimeoutBlockingWaitStrategy.signalAllWhenBlocking(TimeoutBlockingWaitStrategy.java:104)
        - locked <0x0000001001000b10> (a java.lang.Object)
        at com.lmax.disruptor.MultiProducerSequencer.publish(MultiProducerSequencer.java:218)
        at com.lmax.disruptor.RingBuffer.translateAndPublish(RingBuffer.java:990)
        at com.lmax.disruptor.RingBuffer.tryPublishEvent(RingBuffer.java:538)
        at org.apache.logging.log4j.core.async.AsyncLoggerConfigDisruptor.tryEnqueue(AsyncLoggerConfigDisruptor.java:387)
        at org.apache.logging.log4j.core.async.AsyncLoggerConfig.logToAsyncDelegate(AsyncLoggerConfig.java:157)
        at org.apache.logging.log4j.core.async.AsyncLoggerConfig.log(AsyncLoggerConfig.java:138)
        at org.apache.logging.log4j.core.config.LoggerConfig.log(LoggerConfig.java:560)
        at org.apache.logging.log4j.core.config.AwaitCompletionReliabilityStrategy.log(AwaitCompletionReliabilityStrategy.java:82)
        at org.apache.logging.log4j.core.Logger.log(Logger.java:163)
        at org.apache.logging.log4j.spi.AbstractLogger.tryLogMessage(AbstractLogger.java:2168)
        at org.apache.logging.log4j.spi.AbstractLogger.logMessageTrackRecursion(AbstractLogger.java:2122)
        at org.apache.logging.log4j.spi.AbstractLogger.logMessageSafely(AbstractLogger.java:2105)
        at org.apache.logging.log4j.spi.AbstractLogger.logMessage(AbstractLogger.java:2009)
        at org.apache.logging.log4j.spi.AbstractLogger.logIfEnabled(AbstractLogger.java:1878)
        at org.apache.logging.log4j.spi.AbstractLogger.warn(AbstractLogger.java:2757)
        at org.opensearch.cluster.service.ClusterManagerTaskThrottler.lambda$onBeginSubmit$0(ClusterManagerTaskThrottler.java:227)
        at org.opensearch.cluster.service.ClusterManagerTaskThrottler$$Lambda$6671/0x0000008801f25c08.apply(Unknown Source)
        at java.util.concurrent.ConcurrentHashMap.computeIfPresent(java.base@17.0.9/ConcurrentHashMap.java:1828)
        - locked <0x00000011e9000000> (a java.util.concurrent.ConcurrentHashMap$Node)
        at org.opensearch.cluster.service.ClusterManagerTaskThrottler.onBeginSubmit(ClusterManagerTaskThrottler.java:221)
        at org.opensearch.cluster.service.TaskBatcher.submitTasks(TaskBatcher.java:85)
        at org.opensearch.cluster.service.MasterService.submitStateUpdateTasks(MasterService.java:998)
        at org.opensearch.cluster.service.ClusterService.submitStateUpdateTasks(ClusterService.java:373)
        at org.opensearch.cluster.service.ClusterService.submitStateUpdateTask(ClusterService.java:351)
        at org.opensearch.snapshots.SnapshotsService.innerUpdateSnapshotState(SnapshotsService.java:3856)
        at org.opensearch.snapshots.SnapshotsService$UpdateSnapshotStatusAction.clusterManagerOperation(SnapshotsService.java:3951)
        at org.opensearch.snapshots.SnapshotsService$UpdateSnapshotStatusAction.clusterManagerOperation(SnapshotsService.java:3913)
        at org.opensearch.action.support.clustermanager.TransportClusterManagerNodeAction.masterOperation(TransportClusterManagerNodeAction.java:177)
        at org.opensearch.action.support.clustermanager.TransportClusterManagerNodeAction.clusterManagerOperation(TransportClusterManagerNodeAction.java:186)
        at org.opensearch.action.support.clustermanager.TransportClusterManagerNodeAction$AsyncSingleAction.lambda$doStart$1(TransportClusterManagerNodeAction.java:292)
        at org.opensearch.action.support.clustermanager.TransportClusterManagerNodeAction$AsyncSingleAction$$Lambda$6044/0x0000008801c64240.accept(Unknown Source)
        at org.opensearch.action.ActionRunnable$2.doRun(ActionRunnable.java:89)
        at org.opensearch.common.util.concurrent.AbstractRunnable.run(AbstractRunnable.java:52)

Related component

Cluster Manager

To Reproduce

  1. Go to '...'
  2. Click on '....'
  3. Scroll down to '....'
  4. See error

Expected behavior

  1. Network threads shouldn't get blocked.
  2. Task submission should be faster

Additional Details

Plugins
Please list all plugins currently enabled.

Screenshots
If applicable, add screenshots to help explain your problem.

Host/Environment (please complete the following information):

  • OS: [e.g. iOS]
  • Version [e.g. 22]

Additional context
Add any other context about the problem here.

@Bukhtawar Bukhtawar added bug Something isn't working untriaged labels May 18, 2024
@peternied
Copy link
Member

[Triage - attendees 1 2 3 4 5 6 7 8 9 10]
@Bukhtawar thank you for creating this issue to track this problem. In the future please include more details so its more clear how to identify this issue and how to know if it is resolved

@rajiv-kv
Copy link
Contributor

rajiv-kv commented Jun 7, 2024

Looking at the stack-trace of thread dump, the workers are waiting for lock on the throttling-key (update-snapshot-state), and the worker holding the lock is busy logging to file. The log is output at WARN level whenever task is throttled just before failing the transport call.

One possible fix to avoid lock contention, is to move out the following two code blocks outside of compute-if-absent

  • on-throttle error handling + logging
  • minimum node version check

In that way the critical section is limited to incrementing the request count and not any other computation. This would avoid transport-workers getting blocked, if all of them happen to enqueue the tasks that belong to same throttling-key.

@Bukhtawar @shwetathareja - Thoughts ?

@Bukhtawar
Copy link
Collaborator Author

Do you think we should move the task submission and throttling logic off network threads to avoid getting into retry loops and stalling transport?

@rajiv-kv
Copy link
Contributor

I see, the task submission involves the following two operations

  • check for duplicate task before enqueue
  • throttling decision and enforcement

Let me know if i am missing something.

Both of these operations, looks to be light-weight based on the current implementation and should not incur time.
I think we can profile submitTask and evaluate if some of the operations needs to be moved to a background thread (working on the snapshot of PendingTask queue). For instance, the throttling decider can be moved to background thread and submitTask can only enforce the throttling decision.

On the retry of request once throttled, i think this should happen from the caller, and not add retry handlers in cluster-manager. We can have cluster-manager send additional signals about PendingTaskQueue once the request is throttled, which can be used by caller to decide on retries.

@rwali-aws rwali-aws added the v2.16.0 Issues and PRs related to version 2.16.0 label Jul 4, 2024
@shwetathareja
Copy link
Member

shwetathareja commented Jul 4, 2024

Do you think we should move the task submission and throttling logic off network threads to avoid getting into retry loops and stalling transport?

@Bukhtawar As long as we are ensuring the work done on the transport thread is minimal, we dont need to move it to separate threadpool. @rajiv-kv lets profile submit-task method post fixing the logging overhead.

@Bukhtawar
Copy link
Collaborator Author

Do you think we should move the task submission and throttling logic off network threads to avoid getting into retry loops and stalling transport?

@Bukhtawar As long as we are ensuring the work done on the transport thread is minimal, we dont need to move it to separate threadpool. @rajiv-kv lets profile submit-task method post fixing the logging overhead.

We are probably also retrying in the network thread which I don't think is right. Let's verify the retry logic of it's on the network thread

@rwali-aws rwali-aws added the enhancement Enhancement or improvement to existing feature or request label Jul 11, 2024
@rwali-aws rwali-aws added v2.17.0 and removed v2.16.0 Issues and PRs related to version 2.16.0 labels Jul 22, 2024
@rajiv-kv
Copy link
Contributor

Screenshot 2024-08-12 at 6 50 47 PM

Memory allocations during Task Throttling.

  • ClusterManagerThrottlingException seems to be making allocations for filling up stack-trace which can be removed.
  • Additionally we can make a shallow check without lock before computeIfPresent call to avoid lock allocations when the task queue is full.

Thanks @backslasht for sharing the dumps.

@dhwanilpatel
Copy link
Contributor

@rajiv-kv / @sumitasr

With shallow checks, some race condition might occur when we decrement the counter and cluster manager can take the task, but shallow check outside of the lock reject the request.

Considering the tradeoff against the transport thread being blocked, I guess this race condition should be okay. Anyway data node will have retry and retry can go through.

@rajiv-kv
Copy link
Contributor

rajiv-kv commented Sep 2, 2024

@rajiv-kv / @sumitasr

With shallow checks, some race condition might occur when we decrement the counter and cluster manager can take the task, but shallow check outside of the lock reject the request.

Considering the tradeoff against the transport thread being blocked, I guess this race condition should be okay. Anyway data node will have retry and retry can go through.

Yes @dhwanilpatel , the throttling enforcement will be based on the size of queue observed when the request was submitted. It will not consider the in-flight tasks that are getting drained and might free-up the space.

@sumitasr
Copy link
Member

Screenshot 2024-08-12 at 6 50 47 PM

Memory allocations during Task Throttling.

* `ClusterManagerThrottlingException` seems to be making allocations for filling up stack-trace which can be removed.

* Additionally we can make a shallow check without lock before `computeIfPresent` call to avoid lock allocations when the task queue is full.

Thanks @backslasht for sharing the dumps.

Proposed changes were merged in 2.17 as part of this PR - #15508.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working Cluster Manager enhancement Enhancement or improvement to existing feature or request v2.17.0
Projects
Status: Now(This Quarter)
Development

No branches or pull requests

7 participants