Skip to content

Commit

Permalink
Merge pull request #1311 from lilai23/tag_transmission
Browse files Browse the repository at this point in the history
流量标签透传插件:优化rocketmq和跨线程日志的性能消耗
  • Loading branch information
Sherlockhan authored Sep 18, 2023
2 parents 38cf052 + 79d8736 commit 4bcd82f
Show file tree
Hide file tree
Showing 10 changed files with 174 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,22 +20,20 @@
import com.huaweicloud.sermant.core.plugin.agent.declarer.InterceptDeclarer;
import com.huaweicloud.sermant.core.plugin.agent.matcher.ClassMatcher;
import com.huaweicloud.sermant.core.plugin.agent.matcher.MethodMatcher;
import com.huaweicloud.sermant.tag.transmission.interceptors.mq.rocketmq.RocketmqProducerInterceptor;
import com.huaweicloud.sermant.tag.transmission.interceptors.mq.rocketmq.RocketmqProducerSendInterceptor;

/**
* RocketMQ流量标签透传的生产者增强声明,支持RocketMQ4.8+
*
* @author tangle
* @since 2023-07-20
*/
public class RocketmqProducerDeclarer extends AbstractPluginDeclarer {
public class RocketmqProducerSendDeclarer extends AbstractPluginDeclarer {
/**
* 增强类的全限定名、拦截器、拦截方法
*/
private static final String ENHANCE_CLASS = "org.apache.rocketmq.client.impl.MQClientAPIImpl";

private static final String INTERCEPT_CLASS = RocketmqProducerInterceptor.class.getCanonicalName();

private static final String METHOD_NAME = "sendMessage";

private static final int PARAM_INDEX = 12;
Expand All @@ -49,7 +47,7 @@ public ClassMatcher getClassMatcher() {
public InterceptDeclarer[] getInterceptDeclarers(ClassLoader classLoader) {
return new InterceptDeclarer[]{
InterceptDeclarer.build(MethodMatcher.nameEquals(METHOD_NAME)
.and(MethodMatcher.paramCountEquals(PARAM_INDEX)), INTERCEPT_CLASS)
.and(MethodMatcher.paramCountEquals(PARAM_INDEX)), new RocketmqProducerSendInterceptor())
};
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Copyright (C) 2023-2023 Huawei Technologies Co., Ltd. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.huaweicloud.sermant.tag.transmission.declarers.mq.rocketmq;

import com.huaweicloud.sermant.core.plugin.agent.declarer.AbstractPluginDeclarer;
import com.huaweicloud.sermant.core.plugin.agent.declarer.InterceptDeclarer;
import com.huaweicloud.sermant.core.plugin.agent.matcher.ClassMatcher;
import com.huaweicloud.sermant.core.plugin.agent.matcher.MethodMatcher;
import com.huaweicloud.sermant.tag.transmission.interceptors.mq.rocketmq.RocketmqProducerStartInterceptor;

/**
* RocketMQ流量标签透传的生产者启动时增强声明,支持RocketMQ4.8+
*
* @author lilai
* @since 2023-09-16
*/
public class RocketmqProducerStartDeclarer extends AbstractPluginDeclarer {
/**
* 增强类的全限定名、拦截器、拦截方法
*/
private static final String ENHANCE_CLASS = "org.apache.rocketmq.client.producer.MQProducer";

private static final String METHOD_NAME = "start";

@Override
public ClassMatcher getClassMatcher() {
return ClassMatcher.isExtendedFrom(ENHANCE_CLASS);
}

@Override
public InterceptDeclarer[] getInterceptDeclarers(ClassLoader classLoader) {
return new InterceptDeclarer[]{
InterceptDeclarer.build(MethodMatcher.nameEquals(METHOD_NAME), new RocketmqProducerStartInterceptor())
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,13 @@
public abstract class AbstractExecutorInterceptor extends AbstractInterceptor {
private static final Logger LOGGER = LoggerFactory.getLogger();

private static final String RUNNABLE_WRAPPER_CLASS_NAME = RunnableWrapper.class.getCanonicalName();

private static final String CALLABLE_WRAPPER_CLASS_NAME = CallableWrapper.class.getCanonicalName();

private static final String RUNNABLE_AND_CALLABLE_WRAPPER_CLASS_NAME =
RunnableAndCallableWrapper.class.getCanonicalName();

private final boolean cannotTransmit;

/**
Expand Down Expand Up @@ -86,7 +93,7 @@ private ExecuteContext buildCallableWrapper(ExecuteContext context, Object[] arg
TrafficMessage trafficMessage,
Object argument,
String executorName) {
log(argument, trafficMessage, CallableWrapper.class.getCanonicalName());
log(argument, trafficMessage, CALLABLE_WRAPPER_CLASS_NAME);
arguments[0] = new CallableWrapper<>((Callable<?>) argument, trafficMessage,
cannotTransmit, executorName);
return context;
Expand All @@ -96,25 +103,27 @@ private ExecuteContext buildRunnableWrapper(ExecuteContext context, Object[] arg
TrafficMessage trafficMessage,
Object argument,
String executorName) {
log(argument, trafficMessage, RunnableWrapper.class.getCanonicalName());
log(argument, trafficMessage, RUNNABLE_WRAPPER_CLASS_NAME);
arguments[0] = new RunnableWrapper<>((Runnable) argument, trafficMessage,
cannotTransmit, executorName);
return context;
}

private ExecuteContext buildRunnableAndCallableWrapper(ExecuteContext context, Object[] arguments,
TrafficMessage trafficMessage, Object argument, String executorName) {
log(argument, trafficMessage, RunnableAndCallableWrapper.class.getCanonicalName());
log(argument, trafficMessage, RUNNABLE_AND_CALLABLE_WRAPPER_CLASS_NAME);
arguments[0] = new RunnableAndCallableWrapper<>((Runnable) argument, (Callable<?>) argument,
trafficMessage, cannotTransmit, executorName);
return context;
}

private void log(Object argument, TrafficMessage trafficMessage, String wrapperClassName) {
LOGGER.log(Level.FINE, "Class name is {0}, hash code is {1}, trafficTag is {2}, "
+ "trafficData is {3}, will be converted to {4}.",
new Object[]{argument.getClass().getName(), Integer.toHexString(argument.hashCode()),
trafficMessage.getTrafficTag(), trafficMessage.getTrafficData(), wrapperClassName});
if (LOGGER.isLoggable(Level.FINE)) {
LOGGER.log(Level.FINE, "Class name is {0}, hash code is {1}, trafficTag is {2}, "
+ "trafficData is {3}, will be converted to {4}.",
new Object[]{argument.getClass().getName(), Integer.toHexString(argument.hashCode()),
trafficMessage.getTrafficTag(), trafficMessage.getTrafficData(), wrapperClassName});
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.huaweicloud.sermant.core.utils.tag.TrafficUtils;
import com.huaweicloud.sermant.tag.transmission.config.strategy.TagKeyMatcher;
import com.huaweicloud.sermant.tag.transmission.interceptors.AbstractServerInterceptor;
import com.huaweicloud.sermant.tag.transmission.utils.RocketmqProducerMarkUtils;

import org.apache.rocketmq.common.message.Message;

Expand Down Expand Up @@ -49,6 +50,9 @@ public class RocketmqConsumerInterceptor extends AbstractServerInterceptor<Messa

@Override
public ExecuteContext doBefore(ExecuteContext context) {
if (RocketmqProducerMarkUtils.isProducer()) {
return context;
}
StackTraceElement[] stackTraceElements = Thread.currentThread().getStackTrace();
if (!isRocketMqStackTrace(stackTraceElements)) {
return context;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
* @author tangle
* @since 2023-07-20
*/
public class RocketmqProducerInterceptor extends AbstractClientInterceptor<SendMessageRequestHeader> {
public class RocketmqProducerSendInterceptor extends AbstractClientInterceptor<SendMessageRequestHeader> {
/**
* SendMessageRequestHeader在sendMessage方法中的参数下标
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Copyright (C) 2023-2023 Huawei Technologies Co., Ltd. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.huaweicloud.sermant.tag.transmission.interceptors.mq.rocketmq;

import com.huaweicloud.sermant.core.plugin.agent.entity.ExecuteContext;
import com.huaweicloud.sermant.core.plugin.agent.interceptor.AbstractInterceptor;
import com.huaweicloud.sermant.tag.transmission.utils.RocketmqProducerMarkUtils;

/**
* RocketMQ流量标签透传的生产者启动时的拦截器,支持RocketMQ4.8+
*
* @author lilai
* @since 2023-09-16
*/
public class RocketmqProducerStartInterceptor extends AbstractInterceptor {
@Override
public ExecuteContext before(ExecuteContext context) throws Exception {
return context;
}

@Override
public ExecuteContext after(ExecuteContext context) throws Exception {
// 标记生产者线程,防止进入consumer的拦截点
RocketmqProducerMarkUtils.setProducerMark();
return context;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Copyright (C) 2023-2023 Huawei Technologies Co., Ltd. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.huaweicloud.sermant.tag.transmission.utils;

/**
* RocketMQ的生产者线程标记工具类
*
* @author lilai
* @since 2023-09-16
*/
public class RocketmqProducerMarkUtils {
/**
* 生产者线程标记
*/
private static final ThreadLocal<Boolean> PRODUCER_MARK = new ThreadLocal<>();

private RocketmqProducerMarkUtils() {
}

/**
* 标记当前线程为生产者线程
*/
public static void setProducerMark() {
PRODUCER_MARK.set(true);
}

/**
* 判断是否当前线程是否是生产者线程
*
* @return 是否是生产者线程
*/
public static boolean isProducer() {
return PRODUCER_MARK.get() != null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -119,10 +119,12 @@ protected void before(Object obj) {
if (trafficData != null) {
TrafficUtils.setTrafficData(trafficData);
}
LOGGER.log(Level.FINE, "Current thread is {0}, class name is {1}, hash code is {2}, trafficTag is {3}, "
+ "trafficData is {4}, will be executed.",
new Object[]{Thread.currentThread().getName(), obj.getClass().getName(),
Integer.toHexString(obj.hashCode()), trafficTag, trafficData});
if (LOGGER.isLoggable(Level.FINE)) {
LOGGER.log(Level.FINE, "Current thread is {0}, class name is {1}, hash code is {2}, trafficTag is {3}, "
+ "trafficData is {4}, will be executed.",
new Object[]{Thread.currentThread().getName(), obj.getClass().getName(),
Integer.toHexString(obj.hashCode()), trafficTag, trafficData});
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@
com.huaweicloud.sermant.tag.transmission.declarers.http.client.httpclient.HttpClient4xDeclarer
com.huaweicloud.sermant.tag.transmission.declarers.http.server.HttpServletDeclarer
com.huaweicloud.sermant.tag.transmission.declarers.mq.rocketmq.RocketmqConsumerDeclarer
com.huaweicloud.sermant.tag.transmission.declarers.mq.rocketmq.RocketmqProducerDeclarer
com.huaweicloud.sermant.tag.transmission.declarers.mq.rocketmq.RocketmqProducerSendDeclarer
com.huaweicloud.sermant.tag.transmission.declarers.mq.rocketmq.RocketmqProducerStartDeclarer
com.huaweicloud.sermant.tag.transmission.declarers.mq.kafka.KafkaConsumerRecordDeclarer
com.huaweicloud.sermant.tag.transmission.declarers.rpc.dubbo.AlibabaDubboProviderDeclarer
com.huaweicloud.sermant.tag.transmission.declarers.rpc.dubbo.ApacheDubboProviderDeclarer
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,12 @@
* @since 2023-07-27
*/
public class RocketmqProducerInterceptorTest extends BaseInterceptorTest {
private final RocketmqProducerInterceptor interceptor;
private final RocketmqProducerSendInterceptor interceptor;

private final Object[] arguments;

public RocketmqProducerInterceptorTest() {
interceptor = new RocketmqProducerInterceptor();
interceptor = new RocketmqProducerSendInterceptor();
arguments = new Object[12];
}

Expand Down

0 comments on commit 4bcd82f

Please sign in to comment.