Skip to content

Commit

Permalink
流量标签透传插件:适配rocketmq5.x版本
Browse files Browse the repository at this point in the history
  • Loading branch information
daizhenyu committed Sep 25, 2023
1 parent 5960d98 commit 78015a9
Show file tree
Hide file tree
Showing 28 changed files with 592 additions and 19 deletions.
2 changes: 2 additions & 0 deletions sermant-plugins/sermant-tag-transmission/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
<modules>
<module>tag-transmission-plugin</module>
<module>tag-transmission-service</module>
<module>tag-transmission-rocketmq5.x-plugin</module>
<module>tag-transmission-common</module>
</modules>

<properties>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Copyright (C) 2023-2023 Huawei Technologies Co., Ltd. All rights reserved.
~
~ Licensed under the Apache License, Version 2.0 (the "License");
~ you may not use this file except in compliance with the License.
~ You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->

<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>sermant-tag-transmission</artifactId>
<groupId>com.huaweicloud.sermant</groupId>
<version>1.0.0</version>
</parent>
<modelVersion>4.0.0</modelVersion>

<artifactId>tag-transmission-common</artifactId>

<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
</properties>

<dependencies>
<dependency>
<groupId>com.huaweicloud.sermant</groupId>
<artifactId>sermant-agentcore-core</artifactId>
<scope>provided</scope>
</dependency>
</dependencies>

</project>
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,11 @@
<artifactId>sermant-agentcore-core</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.huaweicloud.sermant</groupId>
<artifactId>tag-transmission-common</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import com.huaweicloud.sermant.tag.transmission.interceptors.mq.rocketmq.RocketmqConsumerInterceptor;

/**
* RocketMQ流量标签透传的消费者增强声明,支持RocketMQ4.8+
* RocketMQ流量标签透传的消费者增强声明,支持RocketMQ4.x
*
* @author tangle
* @since 2023-07-19
Expand All @@ -34,8 +34,6 @@ public class RocketmqConsumerDeclarer extends AbstractPluginDeclarer {
*/
private static final String ENHANCE_CLASS = "org.apache.rocketmq.common.message.Message";

private static final String INTERCEPT_CLASS = RocketmqConsumerInterceptor.class.getCanonicalName();

private static final String METHOD_NAME = "getBody";

@Override
Expand All @@ -46,7 +44,7 @@ public ClassMatcher getClassMatcher() {
@Override
public InterceptDeclarer[] getInterceptDeclarers(ClassLoader classLoader) {
return new InterceptDeclarer[]{
InterceptDeclarer.build(MethodMatcher.nameEquals(METHOD_NAME), INTERCEPT_CLASS)
InterceptDeclarer.build(MethodMatcher.nameEquals(METHOD_NAME), new RocketmqConsumerInterceptor())
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import com.huaweicloud.sermant.tag.transmission.interceptors.mq.rocketmq.RocketmqProducerSendInterceptor;

/**
* RocketMQ流量标签透传的生产者增强声明,支持RocketMQ4.8+
* RocketMQ流量标签透传的生产者增强声明,支持RocketMQ4.x
*
* @author tangle
* @since 2023-07-20
Expand All @@ -36,7 +36,20 @@ public class RocketmqProducerSendDeclarer extends AbstractPluginDeclarer {

private static final String METHOD_NAME = "sendMessage";

private static final int PARAM_INDEX = 12;
private static final String[] METHOD_PARAM_TYPES = {
"java.lang.String",
"java.lang.String",
"org.apache.rocketmq.common.message.Message",
"org.apache.rocketmq.common.protocol.header.SendMessageRequestHeader",
"long",
"org.apache.rocketmq.client.impl.CommunicationMode",
"org.apache.rocketmq.client.producer.SendCallback",
"org.apache.rocketmq.client.impl.producer.TopicPublishInfo",
"org.apache.rocketmq.client.impl.factory.MQClientInstance",
"int",
"org.apache.rocketmq.client.hook.SendMessageContext",
"org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl"
};

@Override
public ClassMatcher getClassMatcher() {
Expand All @@ -47,7 +60,7 @@ public ClassMatcher getClassMatcher() {
public InterceptDeclarer[] getInterceptDeclarers(ClassLoader classLoader) {
return new InterceptDeclarer[]{
InterceptDeclarer.build(MethodMatcher.nameEquals(METHOD_NAME)
.and(MethodMatcher.paramCountEquals(PARAM_INDEX)), new RocketmqProducerSendInterceptor())
.and(MethodMatcher.paramTypesEqual(METHOD_PARAM_TYPES)), new RocketmqProducerSendInterceptor())
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import com.huaweicloud.sermant.tag.transmission.interceptors.mq.rocketmq.RocketmqProducerStartInterceptor;

/**
* RocketMQ流量标签透传的生产者启动时增强声明,支持RocketMQ4.8+
* RocketMQ流量标签透传的生产者启动时增强声明,支持RocketMQ4.x
*
* @author lilai
* @since 2023-09-16
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
import java.util.Set;

/**
* RocketMQ流量标签透传的消费者拦截器,支持RocketMQ4.8+
* RocketMQ流量标签透传的消费者拦截器,支持RocketMQ4.x
*
* @author tangle
* @since 2023-07-19
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
import java.util.List;

/**
* RocketMQ流量标签透传的生产者拦截器,支持RocketMQ4.8+
* RocketMQ流量标签透传的生产者拦截器,支持RocketMQ4.x
*
* @author tangle
* @since 2023-07-20
Expand Down Expand Up @@ -81,22 +81,16 @@ private String insertTags2Properties(String oldProperties) {
continue;
}
List<String> values = TrafficUtils.getTrafficTag().getTag().get(key);
if (CollectionUtils.isEmpty(values)) {
newProperties.append(key);
newProperties.append(LINK_MARK);
newProperties.append((String) null);
newProperties.append(SPLIT_MARK);
continue;
}
newProperties.append(key);
newProperties.append(LINK_MARK);
newProperties.append(values.get(0));
newProperties.append(CollectionUtils.isEmpty(values) ? null : values.get(0));
newProperties.append(SPLIT_MARK);
}
if (newProperties.length() == 0) {
return oldProperties;
}
if (oldProperties == null || oldProperties.length() == 0) {
// rocketmq的header为空,需要去除新header最后的分隔符
newProperties.deleteCharAt(newProperties.length() - 1);
return newProperties.toString();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import com.huaweicloud.sermant.tag.transmission.utils.RocketmqProducerMarkUtils;

/**
* RocketMQ流量标签透传的生产者启动时的拦截器,支持RocketMQ4.8+
* RocketMQ流量标签透传的生产者启动时的拦截器,支持RocketMQ4.x+
*
* @author lilai
* @since 2023-09-16
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Copyright (C) 2023-2023 Huawei Technologies Co., Ltd. All rights reserved.
~
~ Licensed under the Apache License, Version 2.0 (the "License");
~ you may not use this file except in compliance with the License.
~ You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->

<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>sermant-tag-transmission</artifactId>
<groupId>com.huaweicloud.sermant</groupId>
<version>1.0.0</version>
</parent>
<modelVersion>4.0.0</modelVersion>

<artifactId>tag-transmission-rocketmq5.x-plugin</artifactId>

<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<config.skip.flag>false</config.skip.flag>
<package.plugin.type>plugin</package.plugin.type>
<rocketmq-clientv5.version>5.1.0</rocketmq-clientv5.version>
</properties>

<dependencies>
<dependency>
<groupId>com.huaweicloud.sermant</groupId>
<artifactId>sermant-agentcore-core</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.huaweicloud.sermant</groupId>
<artifactId>tag-transmission-common</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>${rocketmq-clientv5.version}</version>
<scope>provided</scope>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
</plugin>
</plugins>
</build>

</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Copyright (C) 2023-2023 Huawei Technologies Co., Ltd. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.huaweicloud.sermant.tag.transmission.rocketmqv5.declarers;

import com.huaweicloud.sermant.core.plugin.agent.declarer.AbstractPluginDeclarer;
import com.huaweicloud.sermant.core.plugin.agent.declarer.InterceptDeclarer;
import com.huaweicloud.sermant.core.plugin.agent.matcher.ClassMatcher;
import com.huaweicloud.sermant.core.plugin.agent.matcher.MethodMatcher;
import com.huaweicloud.sermant.tag.transmission.rocketmqv5.interceptor.RocketmqConsumerInterceptor;

/**
* RocketMQ流量标签透传的消费者增强声明,支持RocketMQ5.0+
*
* @author tangle
* @since 2023-07-19
*/
public class RocketmqConsumerDeclarer extends AbstractPluginDeclarer {
/**
* 增强类的全限定名、拦截器、拦截方法
*/
private static final String ENHANCE_CLASS = "org.apache.rocketmq.common.message.Message";

private static final String METHOD_NAME = "getBody";

@Override
public ClassMatcher getClassMatcher() {
return ClassMatcher.nameEquals(ENHANCE_CLASS);
}

@Override
public InterceptDeclarer[] getInterceptDeclarers(ClassLoader classLoader) {
return new InterceptDeclarer[]{
InterceptDeclarer.build(MethodMatcher.nameEquals(METHOD_NAME), new RocketmqConsumerInterceptor())
};
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* Copyright (C) 2023-2023 Huawei Technologies Co., Ltd. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.huaweicloud.sermant.tag.transmission.rocketmqv5.declarers;

import com.huaweicloud.sermant.core.plugin.agent.declarer.AbstractPluginDeclarer;
import com.huaweicloud.sermant.core.plugin.agent.declarer.InterceptDeclarer;
import com.huaweicloud.sermant.core.plugin.agent.matcher.ClassMatcher;
import com.huaweicloud.sermant.core.plugin.agent.matcher.MethodMatcher;
import com.huaweicloud.sermant.tag.transmission.rocketmqv5.interceptor.RocketmqProducerSendInterceptor;

/**
* RocketMQ流量标签透传的生产者增强声明,支持RocketMQ5.0+
*
* @author tangle
* @since 2023-07-20
*/
public class RocketmqProducerSendDeclarer extends AbstractPluginDeclarer {
/**
* 增强类的全限定名、拦截器、拦截方法
*/
private static final String ENHANCE_CLASS = "org.apache.rocketmq.client.impl.MQClientAPIImpl";

private static final String METHOD_NAME = "sendMessage";

private static final String[] METHOD_PARAM_TYPES = {
"java.lang.String",
"java.lang.String",
"org.apache.rocketmq.common.message.Message",
"org.apache.rocketmq.remoting.protocol.header.SendMessageRequestHeader",
"long",
"org.apache.rocketmq.client.impl.CommunicationMode",
"org.apache.rocketmq.client.producer.SendCallback",
"org.apache.rocketmq.client.impl.producer.TopicPublishInfo",
"org.apache.rocketmq.client.impl.factory.MQClientInstance",
"int",
"org.apache.rocketmq.client.hook.SendMessageContext",
"org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl"
};

@Override
public ClassMatcher getClassMatcher() {
return ClassMatcher.nameEquals(ENHANCE_CLASS);
}

@Override
public InterceptDeclarer[] getInterceptDeclarers(ClassLoader classLoader) {
return new InterceptDeclarer[]{
InterceptDeclarer.build(MethodMatcher.nameEquals(METHOD_NAME)
.and(MethodMatcher.paramTypesEqual(METHOD_PARAM_TYPES)), new RocketmqProducerSendInterceptor())
};
}
}
Loading

0 comments on commit 78015a9

Please sign in to comment.