Skip to content

Commit

Permalink
1.Enhance the thread pool to filter out specified jobs
Browse files Browse the repository at this point in the history
2.Fix the issue where jobs submitted to the thread pool are enhanced twice.
  • Loading branch information
hexiaofeng committed May 16, 2024
1 parent 40dc1b0 commit 60ecce2
Show file tree
Hide file tree
Showing 5 changed files with 73 additions and 12 deletions.
4 changes: 3 additions & 1 deletion joylive-package/src/main/assembly/config/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ agent:
groupExpression: ${unit}-${cell}-${group}
transmission:
thread:
excludes:
excludeExecutors:
- org.apache.dubbo.common.threadpool.ThreadlessExecutor
- org.apache.tomcat.util.threads.ThreadPoolExecutor
- org.apache.tomcat.util.threads.ScheduledThreadPoolExecutor
Expand Down Expand Up @@ -161,6 +161,8 @@ agent:
- com.alibaba.nacos.shaded.io.grpc.stub.ClientCalls.ThreadlessExecutor
- com.alibaba.nacos.shaded.io.grpc.SynchronizationContext
- com.alibaba.nacos.shaded.com.google.common.util.concurrent.DirectExecutor
excludeTasks:
- com.alibaba.nacos.shaded.io.grpc.internal.DnsNameResolver.Resolve
counter:
gateway: true
service: true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ public class ThreadConfig {
"io.netty.channel.MultithreadEventLoopGroup",
"io.netty.channel.nio.NioEventLoop",
"io.netty.channel.SingleThreadEventLoop",
"io.netty.channel.kqueue.KQueueEventLoopGroup",
"io.netty.channel.kqueue.KQueueEventLoop",
"io.netty.util.concurrent.MultithreadEventExecutorGroup",
"io.netty.util.concurrent.AbstractEventExecutorGroup",
"io.netty.util.concurrent.ThreadPerTaskExecutor",
Expand All @@ -49,6 +51,8 @@ public class ThreadConfig {
"io.netty.util.concurrent.SingleThreadEventExecutor",
"io.netty.util.concurrent.DefaultEventExecutor",
"io.netty.util.internal.ThreadExecutorMap$1",
"reactor.core.scheduler.BoundedElasticScheduler$BoundedScheduledExecutorService",
"reactor.netty.resources.ColocatedEventLoopGroup",
"com.alibaba.nacos.shaded.io.grpc.netty.shaded.io.netty.channel.nio.NioEventLoopGroup",
"com.alibaba.nacos.shaded.io.grpc.netty.shaded.io.netty.channel.MultithreadEventLoopGroup",
"com.alibaba.nacos.shaded.io.grpc.netty.shaded.io.netty.util.concurrent.MultithreadEventExecutorGroup",
Expand All @@ -70,11 +74,19 @@ public class ThreadConfig {
"com.alibaba.nacos.shaded.com.google.common.util.concurrent.DirectExecutor"
};

private Set<String> excludes = new HashSet<>(Arrays.asList(EXCLUDE_EXECUTOR_CLASSES));
private static final String[] EXCLUDE_TASK_CLASSES = new String[]{
"com.alibaba.nacos.shaded.io.grpc.internal.DnsNameResolver.Resolve",
};

public boolean exclude(String name) {
return name == null || excludes.contains(name);
}
private Set<String> excludeExecutors = new HashSet<>(Arrays.asList(EXCLUDE_EXECUTOR_CLASSES));

private Set<String> excludeTasks = new HashSet<>(Arrays.asList(EXCLUDE_TASK_CLASSES));

public boolean isExcludedExecutor(String name) {
return name == null || excludeExecutors.contains(name);
}

public boolean isExcludedTask(String name) {
return name == null || excludeTasks.contains(name);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,9 @@ public class ExecutorDefinition extends PluginDefinitionAdapter {

public ExecutorDefinition() {
this.matcher = () -> MatcherBuilder.isImplement(TYPE_EXECUTOR).
and(MatcherBuilder.not(MatcherBuilder.in(threadConfig.getExcludes())));
and(MatcherBuilder.not(MatcherBuilder.in(threadConfig.getExcludeExecutors())));
this.interceptors = new InterceptorDefinition[]{
new InterceptorDefinitionAdapter(MatcherBuilder.in(METHODS).and(MatcherBuilder.isPublic()),
() -> new ExecutorInterceptor(handlers))};
() -> new ExecutorInterceptor(handlers, threadConfig))};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -68,10 +68,10 @@ public class ScheduledExecutorServiceDefinition extends PluginDefinitionAdapter

public ScheduledExecutorServiceDefinition() {
this.matcher = () -> MatcherBuilder.isImplement(TYPE_SCHEDULED_EXECUTOR_SERVICE).
and(MatcherBuilder.not(MatcherBuilder.in(threadConfig.getExcludes())));
and(MatcherBuilder.not(MatcherBuilder.in(threadConfig.getExcludeExecutors())));
this.interceptors = new InterceptorDefinition[]{
new InterceptorDefinitionAdapter(MatcherBuilder.in(METHODS).and(MatcherBuilder.isPublic()),
() -> new ExecutorInterceptor(handlers))};
() -> new ExecutorInterceptor(handlers, threadConfig))};
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -23,19 +23,37 @@
import com.jd.live.agent.plugin.transmission.thread.adapter.CallableAdapter;
import com.jd.live.agent.plugin.transmission.thread.adapter.RunnableAdapter;
import com.jd.live.agent.plugin.transmission.thread.adapter.RunnableAndCallableAdapter;
import com.jd.live.agent.plugin.transmission.thread.config.ThreadConfig;

import java.lang.reflect.Field;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.FutureTask;

/**
* ExecutorInterceptor
*/
public class ExecutorInterceptor extends InterceptorAdaptor {

private static final String FIELD_CALLABLE = "callable";

private static Field CALLABLE_FIELD;

private final Camera[] cameras;

public ExecutorInterceptor(List<Camera> cameras) {
private final ThreadConfig threadConfig;

static {
try {
CALLABLE_FIELD = FutureTask.class.getDeclaredField(FIELD_CALLABLE);
CALLABLE_FIELD.setAccessible(true);
} catch (NoSuchFieldException ignore) {
}
}

public ExecutorInterceptor(List<Camera> cameras, ThreadConfig threadConfig) {
this.cameras = cameras == null ? new Camera[0] : cameras.toArray(new Camera[0]);
this.threadConfig = threadConfig;
}

@Override
Expand All @@ -47,15 +65,21 @@ public void onEnter(ExecutableContext ctx) {
return;
}
Object argument = arguments[0];
if (argument == null || argument instanceof AbstractThreadAdapter) {
if (argument == null) {
return;
}
Object unwrapped = unwrap(argument);
if (unwrapped instanceof AbstractThreadAdapter) {
return;
}
if (threadConfig.isExcludedTask(unwrapped.getClass().getName())) {
return;
}

Snapshot[] snapshots = new Snapshot[cameras.length];
for (int i = 0; i < cameras.length; i++) {
snapshots[i] = new Snapshot(cameras[i], cameras[i].snapshot());
}

if (argument instanceof Runnable && argument instanceof Callable) {
arguments[0] = new RunnableAndCallableAdapter<>(name, (Runnable) argument, (Callable<?>) argument, snapshots);
} else if (argument instanceof Runnable) {
Expand All @@ -65,4 +89,27 @@ public void onEnter(ExecutableContext ctx) {
}
}

/**
* Unwraps the provided argument object to retrieve its underlying value. If the argument is an instance
* of {@link AbstractThreadAdapter}, it is returned directly. If the argument is an instance of {@link FutureTask},
* the method attempts to extract the 'callable' field from it using reflection. If the extraction fails,
* the exception is ignored, and the original argument is returned.
*
* @param argument the object to unwrap, which could be an instance of {@link AbstractThreadAdapter} or
* {@link FutureTask} or any other Object.
* @return the unwrapped object if unwrapping is possible, otherwise the original object.
*/
private Object unwrap(Object argument) {
if (argument instanceof AbstractThreadAdapter) {
return argument;
}
if (argument instanceof FutureTask && CALLABLE_FIELD != null) {
try {
return CALLABLE_FIELD.get(argument);
} catch (Exception ignore) {
}
}
return argument;
}

}

0 comments on commit 60ecce2

Please sign in to comment.