diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/SamplerBuilder.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/SamplerBuilder.java index 3046d92d..6484a27b 100644 --- a/spark-common/src/main/java/me/lucko/spark/common/sampler/SamplerBuilder.java +++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/SamplerBuilder.java @@ -120,7 +120,7 @@ public Sampler start(SparkPlatform platform) throws UnsupportedOperationExceptio boolean onlyTicksOverMode = this.ticksOver != -1 && this.tickHook != null; boolean canUseAsyncProfiler = this.useAsyncProfiler && !onlyTicksOverMode && - !(this.ignoreSleeping || this.ignoreNative) && + !((this.ignoreSleeping && this.mode == SamplerMode.ALLOCATION) || this.ignoreNative) && AsyncProfilerAccess.getInstance(platform).checkSupported(platform); if (this.mode == SamplerMode.ALLOCATION && (!canUseAsyncProfiler || !AsyncProfilerAccess.getInstance(platform).checkAllocationProfilingSupported(platform))) { @@ -136,9 +136,9 @@ public Sampler start(SparkPlatform platform) throws UnsupportedOperationExceptio Sampler sampler; if (this.mode == SamplerMode.ALLOCATION) { - sampler = new AsyncSampler(platform, settings, new SampleCollector.Allocation(interval, this.allocLiveOnly)); + sampler = new AsyncSampler(platform, settings, new SampleCollector.Allocation(interval, this.allocLiveOnly), this.ignoreSleeping); } else if (canUseAsyncProfiler) { - sampler = new AsyncSampler(platform, settings, new SampleCollector.Execution(interval)); + sampler = new AsyncSampler(platform, settings, new SampleCollector.Execution(interval), this.ignoreSleeping); } else if (onlyTicksOverMode) { sampler = new JavaSampler(platform, settings, this.ignoreSleeping, this.ignoreNative, this.tickHook, this.ticksOver); } else { diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/aggregator/AbstractDataAggregator.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/aggregator/AbstractDataAggregator.java index 2c003e5c..9625edbe 100644 --- a/spark-common/src/main/java/me/lucko/spark/common/sampler/aggregator/AbstractDataAggregator.java +++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/aggregator/AbstractDataAggregator.java @@ -65,4 +65,13 @@ public List exportData() { } return data; } + + protected static boolean isSleeping(String clazz, String method) { + // java.lang.Thread.yield() + // jdk.internal.misc.Unsafe.park() + // sun.misc.Unsafe.park() + return (clazz.equals("java.lang.Thread") && method.equals("yield")) || + (clazz.equals("jdk.internal.misc.Unsafe") && method.equals("park")) || + (clazz.equals("sun.misc.Unsafe") && method.equals("park")); + } } diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncDataAggregator.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncDataAggregator.java index 484493a3..1be6680a 100644 --- a/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncDataAggregator.java +++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncDataAggregator.java @@ -34,9 +34,11 @@ public class AsyncDataAggregator extends AbstractDataAggregator { /** A describer for async-profiler stack trace elements. */ private static final StackTraceNode.Describer STACK_TRACE_DESCRIBER = (element, parent) -> new StackTraceNode.AsyncDescription(element.getClassName(), element.getMethodName(), element.getMethodDescription()); + private final boolean ignoreSleeping; - protected AsyncDataAggregator(ThreadGrouper threadGrouper) { + protected AsyncDataAggregator(ThreadGrouper threadGrouper, boolean ignoreSleeping) { super(threadGrouper); + this.ignoreSleeping = ignoreSleeping; } @Override @@ -48,6 +50,9 @@ public SamplerMetadata.DataAggregator getMetadata() { } public void insertData(ProfileSegment element, int window) { + if (this.ignoreSleeping && isSleeping(element)) { + return; + } try { ThreadNode node = getNode(this.threadGrouper.getGroup(element.getNativeThreadId(), element.getThreadName())); node.log(STACK_TRACE_DESCRIBER, element.getStackTrace(), element.getValue(), window); @@ -56,4 +61,24 @@ public void insertData(ProfileSegment element, int window) { } } + private static boolean isSleeping(ProfileSegment element) { + // thread states written by async-profiler: + // https://github.com/async-profiler/async-profiler/blob/116504c9f75721911b2f561e29eda065c224caf6/src/flightRecorder.cpp#L1017-L1023 + String threadState = element.getThreadState(); + if (threadState.equals("STATE_SLEEPING")) { + return true; + } + + // async-profiler includes native frames - let's check more than just the top frame + AsyncStackTraceElement[] stackTrace = element.getStackTrace(); + for (int i = 0; i < Math.min(3, stackTrace.length); i++) { + String clazz = stackTrace[i].getClassName(); + String method = stackTrace[i].getMethodName(); + if (isSleeping(clazz, method)) { + return true; + } + } + return false; + } + } diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncSampler.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncSampler.java index 3d179489..edcb0e83 100644 --- a/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncSampler.java +++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncSampler.java @@ -65,11 +65,11 @@ public class AsyncSampler extends AbstractSampler { /** The task to send statistics to the viewer socket */ private ScheduledFuture socketStatisticsTask; - public AsyncSampler(SparkPlatform platform, SamplerSettings settings, SampleCollector collector) { + public AsyncSampler(SparkPlatform platform, SamplerSettings settings, SampleCollector collector, boolean ignoreSleeping) { super(platform, settings); this.sampleCollector = collector; this.profilerAccess = AsyncProfilerAccess.getInstance(platform); - this.dataAggregator = new AsyncDataAggregator(settings.threadGrouper()); + this.dataAggregator = new AsyncDataAggregator(settings.threadGrouper(), ignoreSleeping); this.scheduler = Executors.newSingleThreadScheduledExecutor( new ThreadFactoryBuilder() .setNameFormat("spark-async-sampler-worker-thread") diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/async/ProfileSegment.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/async/ProfileSegment.java index f20c9691..b38eca61 100644 --- a/spark-common/src/main/java/me/lucko/spark/common/sampler/async/ProfileSegment.java +++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/async/ProfileSegment.java @@ -31,6 +31,7 @@ */ public class ProfileSegment { + private static final String UNKNOWN_THREAD_STATE = ""; /** The native thread id (does not correspond to Thread#getId) */ private final int nativeThreadId; /** The name of the thread */ @@ -39,12 +40,15 @@ public class ProfileSegment { private final AsyncStackTraceElement[] stackTrace; /** The time spent executing this segment in microseconds */ private final long value; + /** The state of the thread. {@value #UNKNOWN_THREAD_STATE} if state is unknown */ + private final String threadState; - public ProfileSegment(int nativeThreadId, String threadName, AsyncStackTraceElement[] stackTrace, long value) { + private ProfileSegment(int nativeThreadId, String threadName, AsyncStackTraceElement[] stackTrace, long value, String threadState) { this.nativeThreadId = nativeThreadId; this.threadName = threadName; this.stackTrace = stackTrace; this.value = value; + this.threadState = threadState; } public int getNativeThreadId() { @@ -63,6 +67,10 @@ public long getValue() { return this.value; } + public String getThreadState() { + return threadState; + } + public static ProfileSegment parseSegment(JfrReader reader, JfrReader.Event sample, String threadName, long value) { JfrReader.StackTrace stackTrace = reader.stackTraces.get(sample.stackTraceId); int len = stackTrace != null ? stackTrace.methods.length : 0; @@ -71,8 +79,12 @@ public static ProfileSegment parseSegment(JfrReader reader, JfrReader.Event samp for (int i = 0; i < len; i++) { stack[i] = parseStackFrame(reader, stackTrace.methods[i]); } + String threadState = UNKNOWN_THREAD_STATE; + if (sample instanceof JfrReader.ExecutionSample) { + threadState = reader.threadStates.get(((JfrReader.ExecutionSample) sample).threadState); + } - return new ProfileSegment(sample.tid, threadName, stack, value); + return new ProfileSegment(sample.tid, threadName, stack, value, threadState); } private static AsyncStackTraceElement parseStackFrame(JfrReader reader, long methodId) { diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/java/JavaDataAggregator.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/java/JavaDataAggregator.java index 5b6a4705..b74c7bab 100644 --- a/spark-common/src/main/java/me/lucko/spark/common/sampler/java/JavaDataAggregator.java +++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/java/JavaDataAggregator.java @@ -113,12 +113,7 @@ static boolean isSleeping(ThreadInfo thread) { String clazz = call.getClassName(); String method = call.getMethodName(); - // java.lang.Thread.yield() - // jdk.internal.misc.Unsafe.park() - // sun.misc.Unsafe.park() - return (clazz.equals("java.lang.Thread") && method.equals("yield")) || - (clazz.equals("jdk.internal.misc.Unsafe") && method.equals("park")) || - (clazz.equals("sun.misc.Unsafe") && method.equals("park")); + return isSleeping(clazz, method); } }