Skip to content

Commit

Permalink
【feature】流量标签透传特性
Browse files Browse the repository at this point in the history
支持跨线程传递标签,该能力可单独使用,可与路由等其他插件配合传递请求信息。
  • Loading branch information
lilai23 committed Aug 8, 2023
1 parent 4427972 commit 15d9bee
Show file tree
Hide file tree
Showing 26 changed files with 1,082 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,15 @@ public static MethodMatcher isMemberMethod() {
return methodTypeMatches(MethodType.MEMBER);
}

/**
* 匹配公有方法,见{@link #methodTypeMatches}
*
* @return 方法匹配器对象
*/
public static MethodMatcher isPublicMethod() {
return methodTypeMatches(MethodType.PUBLIC);
}

/**
* 匹配符合类型的方法,包括静态方法,构造函数和成员方法三种
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,15 @@ public boolean match(MethodDescription methodDescription) {
public boolean match(MethodDescription methodDescription) {
return !methodDescription.isStatic() && !methodDescription.isConstructor();
}
},
/**
* 公有方法
*/
PUBLIC() {
@Override
public boolean match(MethodDescription methodDescription) {
return methodDescription.isPublic();
}
};

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* Copyright (C) 2023-2023 Huawei Technologies Co., Ltd. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.huaweicloud.sermant.core.utils.tag;

import java.util.List;
import java.util.Map;

/**
* 流量相关新信息
*
* @author lilai
* @since 2023-07-26
*/
public class TrafficData extends TrafficTag {
private final String path;

private final String httpMethod;

/**
* 构造方法
*
* @param header 请求头/attachments
* @param path 请求路径
* @param httpMethod 请求方法
*/
public TrafficData(Map<String, List<String>> header, String path, String httpMethod) {
super(header);
this.path = path;
this.httpMethod = httpMethod;
}

public String getPath() {
return path;
}

public String getHttpMethod() {
return httpMethod;
}

@Override
public String toString() {
return "{"
+ "path='" + path + '\''
+ ", httpMethod='" + httpMethod + '\''
+ ", tag='" + getTag() + '\''
+ '}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,35 +28,59 @@
* @since 2023-07-17
*/
public class TrafficUtils {
private static final ThreadLocal<TrafficTag> TAG = new ThreadLocal<>();
private static ThreadLocal<TrafficTag> tag = new ThreadLocal<>();

private static ThreadLocal<TrafficData> data = new ThreadLocal<>();

private TrafficUtils() {
}

/**
* 如果开启在new Thread时跨线程传递标签,需要把ThreadLocal初始化为InheritableThreadLocal
*/
public static void setInheritableThreadLocal() {
if (!(tag instanceof InheritableThreadLocal)) {
tag = new InheritableThreadLocal<>();
}

if (!(data instanceof InheritableThreadLocal)) {
data = new InheritableThreadLocal<>();
}
}

/**
* 获取线程中的流量标签
*
* @return 流量标签
*/
public static TrafficTag getTrafficTag() {
return TAG.get();
return tag.get();
}

/**
* 获取线程中的流量信息
*
* @return 流量信息
*/
public static TrafficData getTrafficData() {
return data.get();
}

/**
* 更新线程中的流量标签
*
* @param tag 流量标签map
* @param tagMap 流量标签map
*/
public static void updateTrafficTag(Map<String, List<String>> tag) {
if (MapUtils.isEmpty(tag)) {
public static void updateTrafficTag(Map<String, List<String>> tagMap) {
if (MapUtils.isEmpty(tagMap)) {
return;
}
TrafficTag trafficTag = TAG.get();
TrafficTag trafficTag = TrafficUtils.tag.get();
if (trafficTag == null) {
TAG.set(new TrafficTag(tag));
TrafficUtils.tag.set(new TrafficTag(tagMap));
return;
}
trafficTag.updateTag(tag);
trafficTag.updateTag(tagMap);
}

/**
Expand All @@ -68,13 +92,29 @@ public static void setTrafficTag(TrafficTag trafficTag) {
if (trafficTag == null) {
return;
}
TAG.set(trafficTag);
tag.set(trafficTag);
}

/**
* 删除线程变量
*/
public static void removeTrafficTag() {
TAG.remove();
tag.remove();
}

/**
* 流量信息存入线程变量
*
* @param value 线程变量
*/
public static void setTrafficData(TrafficData value) {
data.set(value);
}

/**
* 删除流量信息
*/
public static void removeTrafficData() {
data.remove();
}
}
16 changes: 14 additions & 2 deletions sermant-plugins/sermant-tag-transmission/config/config.yaml
Original file line number Diff line number Diff line change
@@ -1,3 +1,15 @@
tag.transmission.plugin:
# 流量标签在各种通道间(http/rpc/消息队列等)传递的配置
tag.transmission.config:
# 是否开启流量标签透传
enabled: true
tagKeys: [id,name]
# 需要透传的流量标签的key
tagKeys: [id,name]

# 跨线程传递标签的配置,该能力可单独使用
crossthread.config:
# 是否在直接new Thread时传递标签
enabled-thread: true
# 是否在非定时线程池中传递标签
enabled-thread-pool: true
# 是否在定时线程池的schedule/scheduleAtFixedRate/scheduleWithFixedDelay方法中传递标签
enabled-scheduler: true
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,21 @@
<version>${kafka-clients.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-inline</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/*
* Copyright (C) 2023-2023 Huawei Technologies Co., Ltd. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.huaweicloud.sermant.tag.transmission.config;

import com.huaweicloud.sermant.core.config.common.ConfigFieldKey;
import com.huaweicloud.sermant.core.config.common.ConfigTypeKey;
import com.huaweicloud.sermant.core.plugin.config.PluginConfig;

/**
* 跨线程传递开关配置
*
* @author lilai
* @since 2023-08-02
*/
@ConfigTypeKey("crossthread.config")
public class CrossThreadConfig implements PluginConfig {
/**
* 是否在非定时线程池中传递标签
*/
@ConfigFieldKey("enabled-thread-pool")
private boolean enabledThreadPool;

/**
* 是否在定时线程池的schedule/scheduleAtFixedRate/scheduleWithFixedDelay方法中传递标签
*/
@ConfigFieldKey("enabled-scheduler")
private boolean enabledScheduler;

/**
* 是否在直接new Thread时传递标签
*/
@ConfigFieldKey("enabled-thread")
private boolean enabledThread;

public boolean isEnabledThread() {
return enabledThread;
}

public void setEnabledThread(boolean enabledThread) {
this.enabledThread = enabledThread;
}

public boolean isEnabledThreadPool() {
return enabledThreadPool;
}

public void setEnabledThreadPool(boolean enabledThreadPool) {
this.enabledThreadPool = enabledThreadPool;
}

public boolean isEnabledScheduler() {
return enabledScheduler;
}

public void setEnabledScheduler(boolean enabledScheduler) {
this.enabledScheduler = enabledScheduler;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,12 @@
import java.util.List;

/**
* 流量标签透传插件配置
* 流量标签透传配置
*
* @author lilai
* @since 2023-07-17
*/
@ConfigTypeKey("tag.transmission.plugin")
@ConfigTypeKey("tag.transmission.config")
public class TagTransmissionConfig implements PluginConfig {
/**
* 是否开启适配
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
* @author lilai
* @since 2023-07-18
*/
public class KafkaProviderDeclarer extends AbstractPluginDeclarer {
public class KafkaProducerDeclarer extends AbstractPluginDeclarer {
/**
* 增强类的全限定名
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
* Copyright (C) 2023-2023 Huawei Technologies Co., Ltd. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.huaweicloud.sermant.tag.transmission.declarers.crossthread;

import com.huaweicloud.sermant.core.plugin.agent.declarer.AbstractPluginDeclarer;
import com.huaweicloud.sermant.core.plugin.agent.declarer.InterceptDeclarer;
import com.huaweicloud.sermant.core.plugin.agent.matcher.ClassMatcher;
import com.huaweicloud.sermant.core.plugin.agent.matcher.MethodMatcher;
import com.huaweicloud.sermant.core.plugin.config.PluginConfigManager;
import com.huaweicloud.sermant.core.utils.tag.TrafficUtils;
import com.huaweicloud.sermant.tag.transmission.config.CrossThreadConfig;
import com.huaweicloud.sermant.tag.transmission.interceptors.crossthread.ExecutorInterceptor;

/**
* 拦截Executor
*
* @author provenceee
* @since 2023-04-20
*/
public class ExecutorDeclarer extends AbstractPluginDeclarer {
private static final String ENHANCE_CLASS = "java.util.concurrent.Executor";

private static final String INTERCEPT_CLASS = ExecutorInterceptor.class.getCanonicalName();

private static final String[] METHOD_NAME = {"execute", "submit"};

@Override
public ClassMatcher getClassMatcher() {
return ClassMatcher.isExtendedFrom(ENHANCE_CLASS);
}

@Override
public InterceptDeclarer[] getInterceptDeclarers(ClassLoader classLoader) {
return new InterceptDeclarer[]{
InterceptDeclarer.build(MethodMatcher.nameContains(METHOD_NAME).and(MethodMatcher.isPublicMethod()),
INTERCEPT_CLASS)
};
}

@Override
public boolean isEnabled() {
CrossThreadConfig config = PluginConfigManager.getPluginConfig(CrossThreadConfig.class);
if (config.isEnabledThread()) {
TrafficUtils.setInheritableThreadLocal();
return true;
}
return config.isEnabledThreadPool();
}
}
Loading

0 comments on commit 15d9bee

Please sign in to comment.