From ed99a97b7efd5adcdcee6c32eea6e15a7d9e1cd2 Mon Sep 17 00:00:00 2001 From: luanwenfei Date: Sat, 25 Nov 2023 16:26:12 +0800 Subject: [PATCH 1/2] =?UTF-8?q?=E5=BC=80=E6=BA=90=E7=89=88=E6=9C=AC?= =?UTF-8?q?=E6=BC=8F=E6=B4=9E=E4=BF=AE=E5=A4=8D?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: luanwenfei --- pom.xml | 4 ++-- .../sermant-router/spring-router-plugin/pom.xml | 16 ++++++++++++++-- .../spring-cloud-registry-plugin/pom.xml | 12 ++++++++++++ .../springboot-registry-plugin/pom.xml | 4 ++-- .../pom.xml | 2 +- 5 files changed, 31 insertions(+), 7 deletions(-) diff --git a/pom.xml b/pom.xml index a101259ca7..ececb3d133 100644 --- a/pom.xml +++ b/pom.xml @@ -51,7 +51,7 @@ 3.6.3 1.18.22 8.0.1 - 1.3.2 + 2.7 0.8.8 0.16.0 @@ -59,7 +59,7 @@ 2.17.2 1.2.9 - 4.12 + 4.13.1 5.8.1 3.9.0 3.9.0 diff --git a/sermant-plugins/sermant-router/spring-router-plugin/pom.xml b/sermant-plugins/sermant-router/spring-router-plugin/pom.xml index 727a18cc02..5ec831122e 100644 --- a/sermant-plugins/sermant-router/spring-router-plugin/pom.xml +++ b/sermant-plugins/sermant-router/spring-router-plugin/pom.xml @@ -20,11 +20,12 @@ 1.10.7 1.4.3 1.3.1 + 2.7 5.2.0.RELEASE 9.0.43 - 4.1.0 + 4.11.0 2.7.5 - 4.3 + 4.5.13 4.1.5 @@ -98,8 +99,19 @@ org.mockito mockito-all + + commons-io + commons-io + + + + commons-io + commons-io + ${common-io.version} + provided + com.squareup.okhttp3 okhttp diff --git a/sermant-plugins/sermant-service-registry/spring-cloud-registry-plugin/pom.xml b/sermant-plugins/sermant-service-registry/spring-cloud-registry-plugin/pom.xml index 25a6b688bd..1116397557 100644 --- a/sermant-plugins/sermant-service-registry/spring-cloud-registry-plugin/pom.xml +++ b/sermant-plugins/sermant-service-registry/spring-cloud-registry-plugin/pom.xml @@ -22,6 +22,7 @@ 10.0.0 1.3.0 2.8.9 + 2.7 @@ -101,8 +102,19 @@ org.mockito mockito-all + + commons-io + commons-io + + + + commons-io + commons-io + ${common-io.version} + provided + com.huaweicloud.sermant registry-common diff --git a/sermant-plugins/sermant-springboot-registry/springboot-registry-plugin/pom.xml b/sermant-plugins/sermant-springboot-registry/springboot-registry-plugin/pom.xml index 849847fa25..08c8d15e70 100644 --- a/sermant-plugins/sermant-springboot-registry/springboot-registry-plugin/pom.xml +++ b/sermant-plugins/sermant-springboot-registry/springboot-registry-plugin/pom.xml @@ -19,9 +19,9 @@ plugin 5.2.9.RELEASE 4.1.5 - 4.1.0 + 4.11.0 2.7.5 - 4.5.12 + 4.5.13 5.1.1 9.4.11.v20180605 10.2.3 diff --git a/sermant-plugins/sermant-tag-transmission/tag-transmission-httpclient4.x-plugin/pom.xml b/sermant-plugins/sermant-tag-transmission/tag-transmission-httpclient4.x-plugin/pom.xml index 23abdd6fbf..fc12e02ead 100644 --- a/sermant-plugins/sermant-tag-transmission/tag-transmission-httpclient4.x-plugin/pom.xml +++ b/sermant-plugins/sermant-tag-transmission/tag-transmission-httpclient4.x-plugin/pom.xml @@ -16,7 +16,7 @@ 8 false plugin - 4.3 + 4.5.13 From 599ceef5a55a39d209a04247395e08e1d08de0d4 Mon Sep 17 00:00:00 2001 From: lilai Date: Wed, 6 Dec 2023 10:21:06 +0800 Subject: [PATCH 2/2] =?UTF-8?q?=E6=B6=88=E6=81=AF=E9=98=9F=E5=88=97?= =?UTF-8?q?=E7=A6=81=E6=B6=88=E8=B4=B9=E6=8F=92=E4=BB=B6=E5=88=9D=E5=A7=8B?= =?UTF-8?q?=E6=8F=90=E4=BA=A4?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: lilai --- .../common/plugin-change-check/action.yml | 11 +- .../config/plugins.yaml | 2 +- .../config/test/plugins.yaml | 2 +- sermant-plugins/pom.xml | 6 +- .../config/config.yaml | 3 - .../KafkaConsumerMethodInterceptor.java | 51 ---- .../matcher/KafkaConsumerMethodMatcher.java | 52 ---- .../sermant/kafka/mock/MockKafkaConsumer.java | 261 ------------------ .../message-common/pom.xml | 27 -- .../common/config/DenyConsumeConfig.java | 52 ---- .../message/common/utils/MockUtils.java | 66 ----- ...ud.sermant.core.plugin.config.PluginConfig | 1 - .../rabbitmq-consumer-plugin/pom.xml | 63 ----- .../declarer/RabbitmqChannelDeclarer.java | 69 ----- .../RabbitmqChannelInterceptor.java | 57 ---- ....core.plugin.agent.declarer.PluginDeclarer | 1 - .../config-service/pom.xml | 19 ++ .../config/config.yaml | 7 + .../consumer-controller/pom.xml | 33 +++ .../kafka/cache/KafkaConsumerCache.java | 76 +++++ .../kafka/cache/KafkaConsumerWrapper.java | 26 ++ .../controller/KafkaConsumerController.java | 62 +++++ .../kafka/extension/KafkaConsumerHandler.java | 41 +++ .../kafka-1.x-plugin}/pom.xml | 26 +- .../kafka/declarer/KafkaConsumerDeclarer.java | 24 +- .../KafkaConsumerConstructorInterceptor.java | 94 +++++++ .../KafkaConsumerPollInterceptor.java} | 36 ++- .../kafka/utils/KafkaEnhancementHelper.java | 89 ++++++ ....core.plugin.agent.declarer.PluginDeclarer | 0 .../pom.xml | 26 +- 30 files changed, 510 insertions(+), 773 deletions(-) delete mode 100644 sermant-plugins/sermant-mq-consume-deny/config/config.yaml delete mode 100644 sermant-plugins/sermant-mq-consume-deny/kafka-consumer-plugin/src/main/java/com/huaweicloud/sermant/kafka/interceptor/KafkaConsumerMethodInterceptor.java delete mode 100644 sermant-plugins/sermant-mq-consume-deny/kafka-consumer-plugin/src/main/java/com/huaweicloud/sermant/kafka/matcher/KafkaConsumerMethodMatcher.java delete mode 100644 sermant-plugins/sermant-mq-consume-deny/kafka-consumer-plugin/src/main/java/com/huaweicloud/sermant/kafka/mock/MockKafkaConsumer.java delete mode 100644 sermant-plugins/sermant-mq-consume-deny/message-common/pom.xml delete mode 100644 sermant-plugins/sermant-mq-consume-deny/message-common/src/main/java/com/huaweicloud/sermant/message/common/config/DenyConsumeConfig.java delete mode 100644 sermant-plugins/sermant-mq-consume-deny/message-common/src/main/java/com/huaweicloud/sermant/message/common/utils/MockUtils.java delete mode 100644 sermant-plugins/sermant-mq-consume-deny/message-common/src/main/resources/META-INF/services/com.huaweicloud.sermant.core.plugin.config.PluginConfig delete mode 100644 sermant-plugins/sermant-mq-consume-deny/rabbitmq-consumer-plugin/pom.xml delete mode 100644 sermant-plugins/sermant-mq-consume-deny/rabbitmq-consumer-plugin/src/main/java/com/huaweicloud/sermant/rabbitmq/declarer/RabbitmqChannelDeclarer.java delete mode 100644 sermant-plugins/sermant-mq-consume-deny/rabbitmq-consumer-plugin/src/main/java/com/huaweicloud/sermant/rabbitmq/interceptor/RabbitmqChannelInterceptor.java delete mode 100644 sermant-plugins/sermant-mq-consume-deny/rabbitmq-consumer-plugin/src/main/resources/META-INF/services/com.huaweicloud.sermant.core.plugin.agent.declarer.PluginDeclarer create mode 100644 sermant-plugins/sermant-mq-consume-prohibition/config-service/pom.xml create mode 100644 sermant-plugins/sermant-mq-consume-prohibition/config/config.yaml create mode 100644 sermant-plugins/sermant-mq-consume-prohibition/consumer-controller/pom.xml create mode 100644 sermant-plugins/sermant-mq-consume-prohibition/consumer-controller/src/main/java/com/huaweicloud/sermant/kafka/cache/KafkaConsumerCache.java create mode 100644 sermant-plugins/sermant-mq-consume-prohibition/consumer-controller/src/main/java/com/huaweicloud/sermant/kafka/cache/KafkaConsumerWrapper.java create mode 100644 sermant-plugins/sermant-mq-consume-prohibition/consumer-controller/src/main/java/com/huaweicloud/sermant/kafka/controller/KafkaConsumerController.java create mode 100644 sermant-plugins/sermant-mq-consume-prohibition/consumer-controller/src/main/java/com/huaweicloud/sermant/kafka/extension/KafkaConsumerHandler.java rename sermant-plugins/{sermant-mq-consume-deny/kafka-consumer-plugin => sermant-mq-consume-prohibition/kafka-1.x-plugin}/pom.xml (77%) rename sermant-plugins/{sermant-mq-consume-deny/kafka-consumer-plugin => sermant-mq-consume-prohibition/kafka-1.x-plugin}/src/main/java/com/huaweicloud/sermant/kafka/declarer/KafkaConsumerDeclarer.java (54%) create mode 100644 sermant-plugins/sermant-mq-consume-prohibition/kafka-1.x-plugin/src/main/java/com/huaweicloud/sermant/kafka/interceptor/KafkaConsumerConstructorInterceptor.java rename sermant-plugins/{sermant-mq-consume-deny/kafka-consumer-plugin/src/main/java/com/huaweicloud/sermant/kafka/interceptor/KafkaConsumerConstructorInterceptor.java => sermant-mq-consume-prohibition/kafka-1.x-plugin/src/main/java/com/huaweicloud/sermant/kafka/interceptor/KafkaConsumerPollInterceptor.java} (53%) create mode 100644 sermant-plugins/sermant-mq-consume-prohibition/kafka-1.x-plugin/src/main/java/com/huaweicloud/sermant/kafka/utils/KafkaEnhancementHelper.java rename sermant-plugins/{sermant-mq-consume-deny/kafka-consumer-plugin => sermant-mq-consume-prohibition/kafka-1.x-plugin}/src/main/resources/META-INF/services/com.huaweicloud.sermant.core.plugin.agent.declarer.PluginDeclarer (100%) rename sermant-plugins/{sermant-mq-consume-deny => sermant-mq-consume-prohibition}/pom.xml (62%) diff --git a/.github/actions/common/plugin-change-check/action.yml b/.github/actions/common/plugin-change-check/action.yml index b5d17f700e..cd2bfc592b 100644 --- a/.github/actions/common/plugin-change-check/action.yml +++ b/.github/actions/common/plugin-change-check/action.yml @@ -76,13 +76,14 @@ runs: run: | echo "sermantSpringbootRegistryChanged=${{ steps.changed-sermant-springboot-registry.outputs.changed }}" >> $GITHUB_ENV - uses: marceloprado/has-changed-path@v1.0.1 - id: changed-sermant-mq-consumer-deny + id: changed-sermant-mq-consume-prohibition with: - paths: sermant-plugins/sermant-mq-consumer-deny - - name: env sermant-sermant-mq-consumer-deny + paths: sermant-plugins/sermant-mq-consume-prohibition + - name: env sermant-sermant-mq-consume-prohibition shell: bash run: | - echo "sermantMqConsumerDenyChanged=${{ steps.changed-sermant-mq-consumer-deny.outputs.changed }}" >> $GITHUB_ENV + echo "sermantMqConsumeProhibitionChanged=${{ steps.changed-sermant-mq-consume-prohibition.outputs.changed + }}" >> $GITHUB_ENV - uses: marceloprado/has-changed-path@v1.0.1 id: changed-sermant-removal with: @@ -172,7 +173,7 @@ runs: ${{ env.sermantSpringbootRegistryChanged }} == 'true' -o ${{ env.sermantServiceRegistryChanged }} == 'true' ];then echo "enableSpringLane=true" >> $GITHUB_ENV fi - + # ==========removal is needed to test?========== if [ ${{ env.sermantAgentCoreChanged }} == 'true' -o ${{ env.sermantServiceRemovalChanged }} == 'true' -o \ ${{ env.sermantSpringbootRegistryChanged }} == 'true' -o ${{ env.sermantServiceRegistryChanged }} == 'true' ];then diff --git a/sermant-agentcore/sermant-agentcore-config/config/plugins.yaml b/sermant-agentcore/sermant-agentcore-config/config/plugins.yaml index 62df8b6e4f..640303395f 100644 --- a/sermant-agentcore/sermant-agentcore-config/config/plugins.yaml +++ b/sermant-agentcore/sermant-agentcore-config/config/plugins.yaml @@ -13,7 +13,7 @@ plugins: - dynamic-config - monitor - springboot-registry - - mq-consume-deny + - mq-consume-prohibition - service-removal - service-visibility - tag-transmission diff --git a/sermant-agentcore/sermant-agentcore-config/config/test/plugins.yaml b/sermant-agentcore/sermant-agentcore-config/config/test/plugins.yaml index 62df8b6e4f..640303395f 100644 --- a/sermant-agentcore/sermant-agentcore-config/config/test/plugins.yaml +++ b/sermant-agentcore/sermant-agentcore-config/config/test/plugins.yaml @@ -13,7 +13,7 @@ plugins: - dynamic-config - monitor - springboot-registry - - mq-consume-deny + - mq-consume-prohibition - service-removal - service-visibility - tag-transmission diff --git a/sermant-plugins/pom.xml b/sermant-plugins/pom.xml index b72f9041fa..b74cfcbef1 100644 --- a/sermant-plugins/pom.xml +++ b/sermant-plugins/pom.xml @@ -36,7 +36,7 @@ sermant-router sermant-loadbalancer sermant-springboot-registry - sermant-mq-consume-deny + sermant-mq-consume-prohibition sermant-monitor sermant-service-visibility sermant-service-removal @@ -69,7 +69,7 @@ sermant-dynamic-config sermant-monitor sermant-springboot-registry - sermant-mq-consume-deny + sermant-mq-consume-prohibition sermant-service-visibility sermant-service-removal sermant-spring-beans-deal @@ -101,7 +101,7 @@ sermant-loadbalancer sermant-monitor sermant-springboot-registry - sermant-mq-consume-deny + sermant-mq-consume-prohibition sermant-service-visibility sermant-service-removal sermant-spring-beans-deal diff --git a/sermant-plugins/sermant-mq-consume-deny/config/config.yaml b/sermant-plugins/sermant-mq-consume-deny/config/config.yaml deleted file mode 100644 index 066f2c7406..0000000000 --- a/sermant-plugins/sermant-mq-consume-deny/config/config.yaml +++ /dev/null @@ -1,3 +0,0 @@ -deny.consume.plugin: - enableRabbitmqDeny: false - enableKafkaDeny: false diff --git a/sermant-plugins/sermant-mq-consume-deny/kafka-consumer-plugin/src/main/java/com/huaweicloud/sermant/kafka/interceptor/KafkaConsumerMethodInterceptor.java b/sermant-plugins/sermant-mq-consume-deny/kafka-consumer-plugin/src/main/java/com/huaweicloud/sermant/kafka/interceptor/KafkaConsumerMethodInterceptor.java deleted file mode 100644 index 861f21e9cf..0000000000 --- a/sermant-plugins/sermant-mq-consume-deny/kafka-consumer-plugin/src/main/java/com/huaweicloud/sermant/kafka/interceptor/KafkaConsumerMethodInterceptor.java +++ /dev/null @@ -1,51 +0,0 @@ -/* - * Copyright (C) 2022-2022 Huawei Technologies Co., Ltd. All rights reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on - * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the - * specific language governing permissions and limitations under the License. - */ - -package com.huaweicloud.sermant.kafka.interceptor; - -import com.huaweicloud.sermant.core.plugin.agent.entity.ExecuteContext; -import com.huaweicloud.sermant.core.plugin.agent.interceptor.AbstractInterceptor; -import com.huaweicloud.sermant.core.plugin.config.PluginConfigManager; -import com.huaweicloud.sermant.kafka.mock.MockKafkaConsumer; -import com.huaweicloud.sermant.message.common.config.DenyConsumeConfig; -import com.huaweicloud.sermant.message.common.utils.MockUtils; - -import org.apache.kafka.clients.consumer.Consumer; - -import java.util.Optional; - -/** - * KafkaConsumer 禁消费的一个增强拦截器,只处理非close方法
- * - * @author yuzl 俞真龙 - * @since 2022-10-09 - */ -public class KafkaConsumerMethodInterceptor extends AbstractInterceptor { - private final Consumer mockConsumer = new MockKafkaConsumer<>(); - - @Override - public ExecuteContext before(ExecuteContext context) { - final DenyConsumeConfig pluginConfig = PluginConfigManager.getPluginConfig(DenyConsumeConfig.class); - if (pluginConfig.isEnableKafkaDeny()) { - Optional resultOption = - MockUtils.invokeMethod(mockConsumer, context.getMethod(), context.getArguments()); - context.skip(resultOption.orElse(null)); - } - return context; - } - - @Override - public ExecuteContext after(ExecuteContext context) { - return context; - } -} diff --git a/sermant-plugins/sermant-mq-consume-deny/kafka-consumer-plugin/src/main/java/com/huaweicloud/sermant/kafka/matcher/KafkaConsumerMethodMatcher.java b/sermant-plugins/sermant-mq-consume-deny/kafka-consumer-plugin/src/main/java/com/huaweicloud/sermant/kafka/matcher/KafkaConsumerMethodMatcher.java deleted file mode 100644 index 5fa9b430c6..0000000000 --- a/sermant-plugins/sermant-mq-consume-deny/kafka-consumer-plugin/src/main/java/com/huaweicloud/sermant/kafka/matcher/KafkaConsumerMethodMatcher.java +++ /dev/null @@ -1,52 +0,0 @@ -/* - * Copyright (C) 2022-2022 Huawei Technologies Co., Ltd. All rights reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.huaweicloud.sermant.kafka.matcher; - -import com.huaweicloud.sermant.core.plugin.agent.matcher.MethodMatcher; - -import net.bytebuddy.description.method.MethodDescription; - -import java.util.Arrays; -import java.util.List; - -/** - * 一个自定义的方法匹配器
- * - * @author yuzl 俞真龙 - * @since 2022-10-09 - */ -public abstract class KafkaConsumerMethodMatcher extends MethodMatcher { - private static final List INCLUDE_METHOD = Arrays.asList("assignment", "subscription", "subscribe", - "assign", "unsubscribe", "poll", "commitSync", "seek", "seekToBeginning", "seekToEnd", "position", "committed", - "metrics", "partitionsFor", "listTopics", "paused", "pause", "resume", "offsetsForTimes", "beginningOffsets", - "endOffsets", "groupMetadata", "enforceRebalance", "wakeup"); - - /** - * 非close方法,都需要增强 - * - * @return {@link MethodMatcher} - */ - public static MethodMatcher matchKafkaMethod() { - return new MethodMatcher() { - @Override - public boolean matches(MethodDescription methodDescription) { - return methodDescription.isPublic() && !methodDescription.isStatic() - && INCLUDE_METHOD.contains(methodDescription.getActualName()); - } - }; - } -} diff --git a/sermant-plugins/sermant-mq-consume-deny/kafka-consumer-plugin/src/main/java/com/huaweicloud/sermant/kafka/mock/MockKafkaConsumer.java b/sermant-plugins/sermant-mq-consume-deny/kafka-consumer-plugin/src/main/java/com/huaweicloud/sermant/kafka/mock/MockKafkaConsumer.java deleted file mode 100644 index 25e2430e9e..0000000000 --- a/sermant-plugins/sermant-mq-consume-deny/kafka-consumer-plugin/src/main/java/com/huaweicloud/sermant/kafka/mock/MockKafkaConsumer.java +++ /dev/null @@ -1,261 +0,0 @@ -/* - * Copyright (C) 2022-2022 Huawei Technologies Co., Ltd. All rights reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.huaweicloud.sermant.kafka.mock; - -import org.apache.kafka.clients.consumer.Consumer; -import org.apache.kafka.clients.consumer.ConsumerGroupMetadata; -import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; -import org.apache.kafka.clients.consumer.ConsumerRecords; -import org.apache.kafka.clients.consumer.OffsetAndMetadata; -import org.apache.kafka.clients.consumer.OffsetAndTimestamp; -import org.apache.kafka.clients.consumer.OffsetCommitCallback; -import org.apache.kafka.common.Metric; -import org.apache.kafka.common.MetricName; -import org.apache.kafka.common.PartitionInfo; -import org.apache.kafka.common.TopicPartition; - -import java.time.Duration; -import java.util.Collection; -import java.util.Collections; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.TimeUnit; -import java.util.regex.Pattern; - -/** - * 用于kafka禁写的mock的kafka consumer
- * 非close方法都执行该mock的consumer,而close方法不能增强 - * - * @author yuzl 俞真龙 - * @param key - * @param value - * @since 2022-10-09 - */ -public class MockKafkaConsumer implements Consumer { - @Override - public Set assignment() { - return Collections.emptySet(); - } - - @Override - public Set subscription() { - return Collections.emptySet(); - } - - @Override - public void assign(Collection partitions) { - } - - @Override - public void subscribe(Collection topics) { - } - - @Override - public void subscribe(Collection topics, ConsumerRebalanceListener callback) { - } - - @Override - public void subscribe(Pattern pattern, ConsumerRebalanceListener callback) { - } - - @Override - public void subscribe(Pattern pattern) { - } - - @Override - public void unsubscribe() { - } - - @Override - public ConsumerRecords poll(long timeout) { - return new ConsumerRecords<>(Collections.emptyMap()); - } - - @Override - public ConsumerRecords poll(Duration timeout) { - return new ConsumerRecords<>(Collections.emptyMap()); - } - - @Override - public void commitSync() { - } - - @Override - public void commitSync(Duration timeout) { - } - - @Override - public void commitSync(Map offsets) { - } - - @Override - public void commitSync(Map offsets, Duration timeout) { - } - - @Override - public void commitAsync() { - } - - @Override - public void commitAsync(OffsetCommitCallback callback) { - } - - @Override - public void commitAsync(Map offsets, OffsetCommitCallback callback) { - } - - @Override - public void seek(TopicPartition partition, long offset) { - } - - @Override - public void seek(TopicPartition partition, OffsetAndMetadata offsetAndMetadata) { - } - - @Override - public void seekToBeginning(Collection partitions) { - } - - @Override - public void seekToEnd(Collection partitions) { - } - - @Override - public long position(TopicPartition partition) { - return 0; - } - - @Override - public long position(TopicPartition partition, Duration timeout) { - return 0; - } - - @Override - public OffsetAndMetadata committed(TopicPartition partition) { - return committed(partition, Duration.ZERO); - } - - @Override - public OffsetAndMetadata committed(TopicPartition partition, Duration timeout) { - return new OffsetAndMetadata(0); - } - - @Override - public Map committed(Set partitions) { - return Collections.emptyMap(); - } - - @Override - public Map committed(Set partitions, Duration timeout) { - return Collections.emptyMap(); - } - - @Override - public Map metrics() { - return Collections.emptyMap(); - } - - @Override - public List partitionsFor(String topic) { - return Collections.emptyList(); - } - - @Override - public List partitionsFor(String topic, Duration timeout) { - return Collections.emptyList(); - } - - @Override - public Map> listTopics() { - return Collections.emptyMap(); - } - - @Override - public Map> listTopics(Duration timeout) { - return Collections.emptyMap(); - } - - @Override - public Set paused() { - return Collections.emptySet(); - } - - @Override - public void pause(Collection partitions) { - } - - @Override - public void resume(Collection partitions) { - } - - @Override - public Map offsetsForTimes(Map timestampsToSearch) { - return Collections.emptyMap(); - } - - @Override - public Map offsetsForTimes(Map timestampsToSearch, - Duration timeout) { - return Collections.emptyMap(); - } - - @Override - public Map beginningOffsets(Collection partitions) { - return Collections.emptyMap(); - } - - @Override - public Map beginningOffsets(Collection partitions, Duration timeout) { - return Collections.emptyMap(); - } - - @Override - public Map endOffsets(Collection partitions) { - return Collections.emptyMap(); - } - - @Override - public Map endOffsets(Collection partitions, Duration timeout) { - return Collections.emptyMap(); - } - - @Override - public ConsumerGroupMetadata groupMetadata() { - return new ConsumerGroupMetadata(""); - } - - @Override - public void enforceRebalance() { - } - - @Override - public void close() { - } - - @Override - public void close(long timeout, TimeUnit unit) { - } - - @Override - public void close(Duration timeout) { - } - - @Override - public void wakeup() { - } -} diff --git a/sermant-plugins/sermant-mq-consume-deny/message-common/pom.xml b/sermant-plugins/sermant-mq-consume-deny/message-common/pom.xml deleted file mode 100644 index aca45ffa89..0000000000 --- a/sermant-plugins/sermant-mq-consume-deny/message-common/pom.xml +++ /dev/null @@ -1,27 +0,0 @@ - - - - - sermant-mq-consume-deny - com.huaweicloud.sermant - 1.0.0 - - 4.0.0 - - message-common - message-common - - - - junit - junit - test - - - com.huaweicloud.sermant - sermant-agentcore-core - provided - - - diff --git a/sermant-plugins/sermant-mq-consume-deny/message-common/src/main/java/com/huaweicloud/sermant/message/common/config/DenyConsumeConfig.java b/sermant-plugins/sermant-mq-consume-deny/message-common/src/main/java/com/huaweicloud/sermant/message/common/config/DenyConsumeConfig.java deleted file mode 100644 index d3ce9ff47c..0000000000 --- a/sermant-plugins/sermant-mq-consume-deny/message-common/src/main/java/com/huaweicloud/sermant/message/common/config/DenyConsumeConfig.java +++ /dev/null @@ -1,52 +0,0 @@ -/* - * Copyright (C) 2022-2022 Huawei Technologies Co., Ltd. All rights reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on - * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the - * specific language governing permissions and limitations under the License. - */ - -package com.huaweicloud.sermant.message.common.config; - -import com.huaweicloud.sermant.core.config.common.ConfigTypeKey; -import com.huaweicloud.sermant.core.plugin.config.PluginConfig; - -/** - * 禁消费的启用配置类
- * - * @author yuzl 俞真龙 - * @since 2022-10-13 - */ -@ConfigTypeKey("deny.consume.plugin") -public class DenyConsumeConfig implements PluginConfig { - /** - * 是否开启rabbitmq禁消费 - */ - private boolean enableRabbitmqDeny; - - /** - * 是否开启kafka禁消费 - */ - private boolean enableKafkaDeny; - - public boolean isEnableRabbitmqDeny() { - return enableRabbitmqDeny; - } - - public void setEnableRabbitmqDeny(boolean enableRabbitmqDeny) { - this.enableRabbitmqDeny = enableRabbitmqDeny; - } - - public boolean isEnableKafkaDeny() { - return enableKafkaDeny; - } - - public void setEnableKafkaDeny(boolean enableKafkaDeny) { - this.enableKafkaDeny = enableKafkaDeny; - } -} diff --git a/sermant-plugins/sermant-mq-consume-deny/message-common/src/main/java/com/huaweicloud/sermant/message/common/utils/MockUtils.java b/sermant-plugins/sermant-mq-consume-deny/message-common/src/main/java/com/huaweicloud/sermant/message/common/utils/MockUtils.java deleted file mode 100644 index 7bb248b31e..0000000000 --- a/sermant-plugins/sermant-mq-consume-deny/message-common/src/main/java/com/huaweicloud/sermant/message/common/utils/MockUtils.java +++ /dev/null @@ -1,66 +0,0 @@ -/* - * Copyright (C) 2022-2022 Huawei Technologies Co., Ltd. All rights reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.huaweicloud.sermant.message.common.utils; - -import java.lang.reflect.InvocationTargetException; -import java.lang.reflect.Method; -import java.util.Map; -import java.util.Optional; -import java.util.concurrent.ConcurrentHashMap; - -/** - * 用于缓存增强类的元数据,并且反射调用mock实例的方法的一个工具类
- * - * @author yuzl 俞真龙 - * @since 2022-10-11 - */ -public final class MockUtils { - private static final Map> METHODS = new ConcurrentHashMap<>(); - - private MockUtils() { - } - - /** - * 调用方法 - * - * @param mockObj 模拟obj - * @param method 方法 - * @param arguments 参数 - * @return {@link Object} - */ - public static Optional invokeMethod(Object mockObj, Method method, Object[] arguments) { - Optional methodOptional = METHODS.computeIfAbsent(method, methodsMapKey -> getMethod(mockObj, - methodsMapKey)); - if (!methodOptional.isPresent()) { - return Optional.empty(); - } - try { - return Optional.ofNullable(methodOptional.get().invoke(mockObj, arguments)); - } catch (IllegalAccessException | IllegalArgumentException | InvocationTargetException e) { - return Optional.empty(); - } - } - - private static Optional getMethod(Object mockObj, Method method) { - try { - return Optional - .ofNullable(mockObj.getClass().getDeclaredMethod(method.getName(), method.getParameterTypes())); - } catch (NoSuchMethodException e) { - return Optional.empty(); - } - } -} diff --git a/sermant-plugins/sermant-mq-consume-deny/message-common/src/main/resources/META-INF/services/com.huaweicloud.sermant.core.plugin.config.PluginConfig b/sermant-plugins/sermant-mq-consume-deny/message-common/src/main/resources/META-INF/services/com.huaweicloud.sermant.core.plugin.config.PluginConfig deleted file mode 100644 index 9d3ca7020b..0000000000 --- a/sermant-plugins/sermant-mq-consume-deny/message-common/src/main/resources/META-INF/services/com.huaweicloud.sermant.core.plugin.config.PluginConfig +++ /dev/null @@ -1 +0,0 @@ -com.huaweicloud.sermant.message.common.config.DenyConsumeConfig \ No newline at end of file diff --git a/sermant-plugins/sermant-mq-consume-deny/rabbitmq-consumer-plugin/pom.xml b/sermant-plugins/sermant-mq-consume-deny/rabbitmq-consumer-plugin/pom.xml deleted file mode 100644 index fd3010a760..0000000000 --- a/sermant-plugins/sermant-mq-consume-deny/rabbitmq-consumer-plugin/pom.xml +++ /dev/null @@ -1,63 +0,0 @@ - - - - - sermant-mq-consume-deny - com.huaweicloud.sermant - 1.0.0 - - 4.0.0 - - rabbitmq-consumer-plugin - - rabbitmq-consumer-plugin - - - 8 - 8 - UTF-8 - plugin - false - 5.8.0 - - - - junit - junit - test - - - com.huaweicloud.sermant - message-common - - - com.rabbitmq - amqp-client - ${amqp.client.version} - provided - - - com.huaweicloud.sermant - sermant-agentcore-core - provided - - - - - - - org.apache.maven.plugins - maven-compiler-plugin - - 8 - 8 - - - - org.apache.maven.plugins - maven-shade-plugin - - - - diff --git a/sermant-plugins/sermant-mq-consume-deny/rabbitmq-consumer-plugin/src/main/java/com/huaweicloud/sermant/rabbitmq/declarer/RabbitmqChannelDeclarer.java b/sermant-plugins/sermant-mq-consume-deny/rabbitmq-consumer-plugin/src/main/java/com/huaweicloud/sermant/rabbitmq/declarer/RabbitmqChannelDeclarer.java deleted file mode 100644 index 81b1405bfa..0000000000 --- a/sermant-plugins/sermant-mq-consume-deny/rabbitmq-consumer-plugin/src/main/java/com/huaweicloud/sermant/rabbitmq/declarer/RabbitmqChannelDeclarer.java +++ /dev/null @@ -1,69 +0,0 @@ -/* - * Copyright (C) 2022-2022 Huawei Technologies Co., Ltd. All rights reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.huaweicloud.sermant.rabbitmq.declarer; - -import com.huaweicloud.sermant.core.plugin.agent.declarer.AbstractPluginDeclarer; -import com.huaweicloud.sermant.core.plugin.agent.declarer.InterceptDeclarer; -import com.huaweicloud.sermant.core.plugin.agent.matcher.ClassMatcher; -import com.huaweicloud.sermant.core.plugin.agent.matcher.MethodMatcher; -import com.huaweicloud.sermant.rabbitmq.interceptor.RabbitmqChannelInterceptor; - -import net.bytebuddy.description.method.MethodDescription; - -import java.util.Arrays; -import java.util.List; - -/** - * 对rabbit mq的禁消费的一个增强定义
- * - * @author yuzl 俞真龙 - * @since 2022-10-11 - */ -public class RabbitmqChannelDeclarer extends AbstractPluginDeclarer { - /** - * basicConsume方法名 - */ - public static final String BASIC_CONSUME = "basicConsume"; - - /** - * basicAck方法名 - */ - public static final String BASIC_ACK = "basicAck"; - - private static final String ENHANCE_CLASS = "com.rabbitmq.client.impl.ChannelN"; - - private static final List ENHANCE_METHODS = Arrays.asList(BASIC_CONSUME, BASIC_ACK); - - @Override - public ClassMatcher getClassMatcher() { - return ClassMatcher.nameEquals(ENHANCE_CLASS); - } - - @Override - public InterceptDeclarer[] getInterceptDeclarers(ClassLoader classLoader) { - return new InterceptDeclarer[]{InterceptDeclarer.build(rabbitmqMatcher(), new RabbitmqChannelInterceptor())}; - } - - private MethodMatcher rabbitmqMatcher() { - return new MethodMatcher() { - @Override - public boolean matches(MethodDescription methodDescription) { - return ENHANCE_METHODS.contains(methodDescription.getActualName()); - } - }; - } -} diff --git a/sermant-plugins/sermant-mq-consume-deny/rabbitmq-consumer-plugin/src/main/java/com/huaweicloud/sermant/rabbitmq/interceptor/RabbitmqChannelInterceptor.java b/sermant-plugins/sermant-mq-consume-deny/rabbitmq-consumer-plugin/src/main/java/com/huaweicloud/sermant/rabbitmq/interceptor/RabbitmqChannelInterceptor.java deleted file mode 100644 index 13777ef075..0000000000 --- a/sermant-plugins/sermant-mq-consume-deny/rabbitmq-consumer-plugin/src/main/java/com/huaweicloud/sermant/rabbitmq/interceptor/RabbitmqChannelInterceptor.java +++ /dev/null @@ -1,57 +0,0 @@ -/* - * Copyright (C) 2022-2022 Huawei Technologies Co., Ltd. All rights reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.huaweicloud.sermant.rabbitmq.interceptor; - -import com.huaweicloud.sermant.core.plugin.agent.entity.ExecuteContext; -import com.huaweicloud.sermant.core.plugin.agent.interceptor.AbstractInterceptor; -import com.huaweicloud.sermant.core.plugin.config.PluginConfigManager; -import com.huaweicloud.sermant.message.common.config.DenyConsumeConfig; -import com.huaweicloud.sermant.rabbitmq.declarer.RabbitmqChannelDeclarer; - -/** - * 用于对rabbit mq的一个拦截操作
- * - * @author yuzl 俞真龙 - * @since 2022-10-11 - */ -public class RabbitmqChannelInterceptor extends AbstractInterceptor { - private static final String MOCK_CONSUMER_TAG = ""; - private static final String MOCK_VOID = null; - - @Override - public ExecuteContext before(ExecuteContext context) { - final DenyConsumeConfig pluginConfig = PluginConfigManager.getPluginConfig(DenyConsumeConfig.class); - if (pluginConfig.isEnableRabbitmqDeny()) { - skip(context); - } - return context; - } - - private void skip(ExecuteContext context) { - String methodName = context.getMethod().getName(); - if (RabbitmqChannelDeclarer.BASIC_CONSUME.equals(methodName)) { - context.skip(MOCK_CONSUMER_TAG); - } else if (RabbitmqChannelDeclarer.BASIC_ACK.equals(methodName)) { - context.skip(MOCK_VOID); - } - } - - @Override - public ExecuteContext after(ExecuteContext context) { - return context; - } -} diff --git a/sermant-plugins/sermant-mq-consume-deny/rabbitmq-consumer-plugin/src/main/resources/META-INF/services/com.huaweicloud.sermant.core.plugin.agent.declarer.PluginDeclarer b/sermant-plugins/sermant-mq-consume-deny/rabbitmq-consumer-plugin/src/main/resources/META-INF/services/com.huaweicloud.sermant.core.plugin.agent.declarer.PluginDeclarer deleted file mode 100644 index 5add5ce967..0000000000 --- a/sermant-plugins/sermant-mq-consume-deny/rabbitmq-consumer-plugin/src/main/resources/META-INF/services/com.huaweicloud.sermant.core.plugin.agent.declarer.PluginDeclarer +++ /dev/null @@ -1 +0,0 @@ -com.huaweicloud.sermant.rabbitmq.declarer.RabbitmqChannelDeclarer \ No newline at end of file diff --git a/sermant-plugins/sermant-mq-consume-prohibition/config-service/pom.xml b/sermant-plugins/sermant-mq-consume-prohibition/config-service/pom.xml new file mode 100644 index 0000000000..28a3e68fd1 --- /dev/null +++ b/sermant-plugins/sermant-mq-consume-prohibition/config-service/pom.xml @@ -0,0 +1,19 @@ + + + + sermant-mq-consume-prohibition + com.huaweicloud.sermant + 1.0.0 + + 4.0.0 + + config-service + + + 8 + 8 + + + \ No newline at end of file diff --git a/sermant-plugins/sermant-mq-consume-prohibition/config/config.yaml b/sermant-plugins/sermant-mq-consume-prohibition/config/config.yaml new file mode 100644 index 0000000000..43f0026d9b --- /dev/null +++ b/sermant-plugins/sermant-mq-consume-prohibition/config/config.yaml @@ -0,0 +1,7 @@ +kafka: + enableProhibition: true + topic: + - test1 + - test2 + rocketMq: + enableProhibition: true \ No newline at end of file diff --git a/sermant-plugins/sermant-mq-consume-prohibition/consumer-controller/pom.xml b/sermant-plugins/sermant-mq-consume-prohibition/consumer-controller/pom.xml new file mode 100644 index 0000000000..9a0d40ac7f --- /dev/null +++ b/sermant-plugins/sermant-mq-consume-prohibition/consumer-controller/pom.xml @@ -0,0 +1,33 @@ + + + + sermant-mq-consume-prohibition + com.huaweicloud.sermant + 1.0.0 + + 4.0.0 + + consumer-controller + + + com.huaweicloud.sermant + sermant-agentcore-god + 1.0.0 + compile + + + org.apache.kafka + kafka-clients + 1.1.0 + provided + + + + + 8 + 8 + + + \ No newline at end of file diff --git a/sermant-plugins/sermant-mq-consume-prohibition/consumer-controller/src/main/java/com/huaweicloud/sermant/kafka/cache/KafkaConsumerCache.java b/sermant-plugins/sermant-mq-consume-prohibition/consumer-controller/src/main/java/com/huaweicloud/sermant/kafka/cache/KafkaConsumerCache.java new file mode 100644 index 0000000000..882f80bb79 --- /dev/null +++ b/sermant-plugins/sermant-mq-consume-prohibition/consumer-controller/src/main/java/com/huaweicloud/sermant/kafka/cache/KafkaConsumerCache.java @@ -0,0 +1,76 @@ +/* + * Copyright (C) 2023-2023 Huawei Technologies Co., Ltd. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.huaweicloud.sermant.kafka.cache; + +import org.apache.kafka.clients.consumer.KafkaConsumer; + +import java.util.Set; +import java.util.concurrent.CopyOnWriteArraySet; + +/** + * KafkaConsumer缓存 + * + * @author lilai + * @since 2023-12-05 + */ +public enum KafkaConsumerCache { + /** + * 单例 + */ + INSTANCE; + + /** + * 消费者缓存 + */ + private final Set kafkaConsumerCache = new CopyOnWriteArraySet<>(); + + KafkaConsumerCache() { + init(); + } + + private void init() { + + } + + /** + * 获取消费者缓存 + * + * @return 消费者缓存 + */ + public Set getCache() { + return kafkaConsumerCache; + } + + /** + * 更新Kafka消费者缓存列表 + * + * @param kafkaConsumer 消费者实例 + */ + public void updateCache(KafkaConsumer kafkaConsumer) { + kafkaConsumerCache.add(convert(kafkaConsumer)); + } + + /** + * 消费者实例转换 + * + * @param kafkaConsumer 原始消费者实例 + * @return 消费者包装实例 + */ + private KafkaConsumerWrapper convert(KafkaConsumer kafkaConsumer) { + return new KafkaConsumerWrapper(); + } +} diff --git a/sermant-plugins/sermant-mq-consume-prohibition/consumer-controller/src/main/java/com/huaweicloud/sermant/kafka/cache/KafkaConsumerWrapper.java b/sermant-plugins/sermant-mq-consume-prohibition/consumer-controller/src/main/java/com/huaweicloud/sermant/kafka/cache/KafkaConsumerWrapper.java new file mode 100644 index 0000000000..c557632184 --- /dev/null +++ b/sermant-plugins/sermant-mq-consume-prohibition/consumer-controller/src/main/java/com/huaweicloud/sermant/kafka/cache/KafkaConsumerWrapper.java @@ -0,0 +1,26 @@ +/* + * Copyright (C) 2023-2023 Huawei Technologies Co., Ltd. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.huaweicloud.sermant.kafka.cache; + +/** + * Kafka实例包装类 + * + * @author lilai + * @since 2023-12-05 + */ +public class KafkaConsumerWrapper { +} diff --git a/sermant-plugins/sermant-mq-consume-prohibition/consumer-controller/src/main/java/com/huaweicloud/sermant/kafka/controller/KafkaConsumerController.java b/sermant-plugins/sermant-mq-consume-prohibition/consumer-controller/src/main/java/com/huaweicloud/sermant/kafka/controller/KafkaConsumerController.java new file mode 100644 index 0000000000..b4adb913fe --- /dev/null +++ b/sermant-plugins/sermant-mq-consume-prohibition/consumer-controller/src/main/java/com/huaweicloud/sermant/kafka/controller/KafkaConsumerController.java @@ -0,0 +1,62 @@ +/* + * Copyright (C) 2023-2023 Huawei Technologies Co., Ltd. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.huaweicloud.sermant.kafka.controller; + +import com.huaweicloud.sermant.kafka.cache.KafkaConsumerCache; +import com.huaweicloud.sermant.kafka.cache.KafkaConsumerWrapper; + +import org.apache.kafka.clients.consumer.KafkaConsumer; + +import java.util.Set; + +/** + * KafkaConsumer消费控制器 + * + * @author lilai + * @since 2023-12-05 + */ +public class KafkaConsumerController { + private KafkaConsumerController() { + } + + /** + * 关闭消费 + * + * @param kafkaConsumerWrapper 消费者包装实例 + * @param topics 消费主题 + */ + public static void disableConsumption(KafkaConsumerWrapper kafkaConsumerWrapper, Set topics) { + } + + /** + * 开启消费 + * + * @param kafkaConsumerWrapper 消费者包装实例 + * @param topics 消费主题 + */ + public static void enableConsumption(KafkaConsumerWrapper kafkaConsumerWrapper, Set topics) { + } + + /** + * 更新消费者缓存 + * + * @param kafkaConsumer 消费者实例 + */ + public static void updateConsumerCache(KafkaConsumer kafkaConsumer) { + KafkaConsumerCache.INSTANCE.updateCache(kafkaConsumer); + } +} diff --git a/sermant-plugins/sermant-mq-consume-prohibition/consumer-controller/src/main/java/com/huaweicloud/sermant/kafka/extension/KafkaConsumerHandler.java b/sermant-plugins/sermant-mq-consume-prohibition/consumer-controller/src/main/java/com/huaweicloud/sermant/kafka/extension/KafkaConsumerHandler.java new file mode 100644 index 0000000000..e6bc3a518e --- /dev/null +++ b/sermant-plugins/sermant-mq-consume-prohibition/consumer-controller/src/main/java/com/huaweicloud/sermant/kafka/extension/KafkaConsumerHandler.java @@ -0,0 +1,41 @@ +/* + * Copyright (C) 2023-2023 Huawei Technologies Co., Ltd. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.huaweicloud.sermant.kafka.extension; + +import com.huaweicloud.sermant.core.plugin.agent.entity.ExecuteContext; + +/** + * KafkaConsumer处理器接口,供外部实现在创建KafkaConsumer时执行相应操作 + * + * @author lilai + * @since 2023-12-05 + */ +public interface KafkaConsumerHandler { + /** + * 拦截点之前的处理 + * + * @param context 拦截点上下文 + */ + void doBefore(ExecuteContext context); + + /** + * 拦截点之后的处理 + * + * @param context 拦截点上下文 + */ + void doAfter(ExecuteContext context); +} diff --git a/sermant-plugins/sermant-mq-consume-deny/kafka-consumer-plugin/pom.xml b/sermant-plugins/sermant-mq-consume-prohibition/kafka-1.x-plugin/pom.xml similarity index 77% rename from sermant-plugins/sermant-mq-consume-deny/kafka-consumer-plugin/pom.xml rename to sermant-plugins/sermant-mq-consume-prohibition/kafka-1.x-plugin/pom.xml index a99c44aeb6..d5d39b878f 100644 --- a/sermant-plugins/sermant-mq-consume-deny/kafka-consumer-plugin/pom.xml +++ b/sermant-plugins/sermant-mq-consume-prohibition/kafka-1.x-plugin/pom.xml @@ -1,23 +1,23 @@ - - + - sermant-mq-consume-deny + sermant-mq-consume-prohibition com.huaweicloud.sermant 1.0.0 4.0.0 - kafka-consumer-plugin - kafka-consumer-plugin + kafka-1.x-plugin + 8 8 UTF-8 plugin false - 2.7.2 + 1.1.0 @@ -26,10 +26,6 @@ junit test - - com.huaweicloud.sermant - message-common - org.apache.kafka kafka-clients @@ -41,6 +37,11 @@ sermant-agentcore-core provided + + com.huaweicloud.sermant + consumer-controller + ${project.version} + @@ -59,4 +60,5 @@ - + + \ No newline at end of file diff --git a/sermant-plugins/sermant-mq-consume-deny/kafka-consumer-plugin/src/main/java/com/huaweicloud/sermant/kafka/declarer/KafkaConsumerDeclarer.java b/sermant-plugins/sermant-mq-consume-prohibition/kafka-1.x-plugin/src/main/java/com/huaweicloud/sermant/kafka/declarer/KafkaConsumerDeclarer.java similarity index 54% rename from sermant-plugins/sermant-mq-consume-deny/kafka-consumer-plugin/src/main/java/com/huaweicloud/sermant/kafka/declarer/KafkaConsumerDeclarer.java rename to sermant-plugins/sermant-mq-consume-prohibition/kafka-1.x-plugin/src/main/java/com/huaweicloud/sermant/kafka/declarer/KafkaConsumerDeclarer.java index 15b77a3f84..76aba57758 100644 --- a/sermant-plugins/sermant-mq-consume-deny/kafka-consumer-plugin/src/main/java/com/huaweicloud/sermant/kafka/declarer/KafkaConsumerDeclarer.java +++ b/sermant-plugins/sermant-mq-consume-prohibition/kafka-1.x-plugin/src/main/java/com/huaweicloud/sermant/kafka/declarer/KafkaConsumerDeclarer.java @@ -1,5 +1,5 @@ /* - * Copyright (C) 2022-2022 Huawei Technologies Co., Ltd. All rights reserved. + * Copyright (C) 2023-2023 Huawei Technologies Co., Ltd. All rights reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at @@ -16,30 +16,24 @@ import com.huaweicloud.sermant.core.plugin.agent.declarer.AbstractPluginDeclarer; import com.huaweicloud.sermant.core.plugin.agent.declarer.InterceptDeclarer; import com.huaweicloud.sermant.core.plugin.agent.matcher.ClassMatcher; -import com.huaweicloud.sermant.core.plugin.agent.matcher.MethodMatcher; -import com.huaweicloud.sermant.kafka.interceptor.KafkaConsumerConstructorInterceptor; -import com.huaweicloud.sermant.kafka.interceptor.KafkaConsumerMethodInterceptor; -import com.huaweicloud.sermant.kafka.matcher.KafkaConsumerMethodMatcher; +import com.huaweicloud.sermant.kafka.utils.KafkaEnhancementHelper; /** - * kakfa消费端消费declarer
+ * KafkaConsumer拦截点声明 * - * @author yuzl 俞真龙 - * @since 2022-10-09 + * @author lilai + * @since 2023-12-05 */ public class KafkaConsumerDeclarer extends AbstractPluginDeclarer { - private static final String ENHANCE_CLASS = "org.apache.kafka.clients.consumer.KafkaConsumer"; - @Override public ClassMatcher getClassMatcher() { - return ClassMatcher.nameEquals(ENHANCE_CLASS); + return KafkaEnhancementHelper.getClassMatcher(); } @Override public InterceptDeclarer[] getInterceptDeclarers(ClassLoader classLoader) { - return new InterceptDeclarer[] { - InterceptDeclarer.build(KafkaConsumerMethodMatcher.matchKafkaMethod(), - new KafkaConsumerMethodInterceptor()), - InterceptDeclarer.build(MethodMatcher.isConstructor(), new KafkaConsumerConstructorInterceptor())}; + return new InterceptDeclarer[]{ + KafkaEnhancementHelper.getConstructorInterceptDeclarers(), + KafkaEnhancementHelper.getPollInterceptDeclarers()}; } } diff --git a/sermant-plugins/sermant-mq-consume-prohibition/kafka-1.x-plugin/src/main/java/com/huaweicloud/sermant/kafka/interceptor/KafkaConsumerConstructorInterceptor.java b/sermant-plugins/sermant-mq-consume-prohibition/kafka-1.x-plugin/src/main/java/com/huaweicloud/sermant/kafka/interceptor/KafkaConsumerConstructorInterceptor.java new file mode 100644 index 0000000000..7b42cd167e --- /dev/null +++ b/sermant-plugins/sermant-mq-consume-prohibition/kafka-1.x-plugin/src/main/java/com/huaweicloud/sermant/kafka/interceptor/KafkaConsumerConstructorInterceptor.java @@ -0,0 +1,94 @@ +/* + * Copyright (C) 2023-2023 Huawei Technologies Co., Ltd. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.huaweicloud.sermant.kafka.interceptor; + +import com.huaweicloud.sermant.core.common.LoggerFactory; +import com.huaweicloud.sermant.core.plugin.agent.entity.ExecuteContext; +import com.huaweicloud.sermant.core.plugin.agent.interceptor.AbstractInterceptor; +import com.huaweicloud.sermant.kafka.controller.KafkaConsumerController; +import com.huaweicloud.sermant.kafka.extension.KafkaConsumerHandler; + +import org.apache.kafka.clients.consumer.KafkaConsumer; + +import java.util.logging.Logger; + +/** + * KafkaConsumer构造方法的拦截器 + * + * @author lilai + * @since 2023-12-05 + */ +public class KafkaConsumerConstructorInterceptor extends AbstractInterceptor { + private static final Logger LOGGER = LoggerFactory.getLogger(); + + private KafkaConsumerHandler handler; + + /** + * 带有KafkaConsumerHandler的构造方案 + * + * @param handler + */ + public KafkaConsumerConstructorInterceptor(KafkaConsumerHandler handler) { + this.handler = handler; + } + + /** + * 无参数构造方法 + */ + public KafkaConsumerConstructorInterceptor() { + } + + @Override + public ExecuteContext before(ExecuteContext context) { + if (handler != null) { + handler.doBefore(context); + } + return context; + } + + @Override + public ExecuteContext after(ExecuteContext context) { + if (handler != null) { + handler.doAfter(context); + } else { + processStartUpConsumption(); + } + + cacheKafkaConsumer(context); + return context; + } + + /** + * 缓存消费者实例 + * + * @param context 拦截点执行上下文 + */ + private void cacheKafkaConsumer(ExecuteContext context) { + Object kafkaConsumerObject = context.getObject(); + if (kafkaConsumerObject instanceof KafkaConsumer) { + KafkaConsumer consumer = (KafkaConsumer) kafkaConsumerObject; + KafkaConsumerController.updateConsumerCache(consumer); + LOGGER.info("KafkaConsumer has been cached by Sermant"); + } + } + + /** + * 处理启动过程中的禁止消费 + */ + private void processStartUpConsumption() { + } +} diff --git a/sermant-plugins/sermant-mq-consume-deny/kafka-consumer-plugin/src/main/java/com/huaweicloud/sermant/kafka/interceptor/KafkaConsumerConstructorInterceptor.java b/sermant-plugins/sermant-mq-consume-prohibition/kafka-1.x-plugin/src/main/java/com/huaweicloud/sermant/kafka/interceptor/KafkaConsumerPollInterceptor.java similarity index 53% rename from sermant-plugins/sermant-mq-consume-deny/kafka-consumer-plugin/src/main/java/com/huaweicloud/sermant/kafka/interceptor/KafkaConsumerConstructorInterceptor.java rename to sermant-plugins/sermant-mq-consume-prohibition/kafka-1.x-plugin/src/main/java/com/huaweicloud/sermant/kafka/interceptor/KafkaConsumerPollInterceptor.java index a5ecae508f..ca3e90883f 100644 --- a/sermant-plugins/sermant-mq-consume-deny/kafka-consumer-plugin/src/main/java/com/huaweicloud/sermant/kafka/interceptor/KafkaConsumerConstructorInterceptor.java +++ b/sermant-plugins/sermant-mq-consume-prohibition/kafka-1.x-plugin/src/main/java/com/huaweicloud/sermant/kafka/interceptor/KafkaConsumerPollInterceptor.java @@ -1,5 +1,5 @@ /* - * Copyright (C) 2022-2022 Huawei Technologies Co., Ltd. All rights reserved. + * Copyright (C) 2023-2023 Huawei Technologies Co., Ltd. All rights reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -19,22 +19,24 @@ import com.huaweicloud.sermant.core.common.LoggerFactory; import com.huaweicloud.sermant.core.plugin.agent.entity.ExecuteContext; import com.huaweicloud.sermant.core.plugin.agent.interceptor.AbstractInterceptor; -import com.huaweicloud.sermant.core.plugin.config.PluginConfigManager; -import com.huaweicloud.sermant.message.common.config.DenyConsumeConfig; - -import org.apache.kafka.clients.consumer.KafkaConsumer; import java.util.logging.Logger; /** - * kafka consumer的构造方法拦截器,在kafka构造方法执行之后立即关闭KafkaConsumer
+ * KafkaConsumer构造方法的拦截器 * - * @author yuzl 俞真龙 - * @since 2022-10-09 + * @author lilai + * @since 2023-12-05 */ -public class KafkaConsumerConstructorInterceptor extends AbstractInterceptor { +public class KafkaConsumerPollInterceptor extends AbstractInterceptor { private static final Logger LOGGER = LoggerFactory.getLogger(); + /** + * 无参构造方法 + */ + public KafkaConsumerPollInterceptor() { + } + @Override public ExecuteContext before(ExecuteContext context) { return context; @@ -42,17 +44,11 @@ public ExecuteContext before(ExecuteContext context) { @Override public ExecuteContext after(ExecuteContext context) { - final DenyConsumeConfig pluginConfig = PluginConfigManager.getPluginConfig(DenyConsumeConfig.class); - if (pluginConfig.isEnableKafkaDeny()) { - Object object = context.getObject(); - if (object instanceof KafkaConsumer) { - KafkaConsumer consumer = (KafkaConsumer) object; - - // 方法执行结束后,直接关闭consumer - consumer.close(); - LOGGER.info("kafka consumer has been closed by sermant"); - } - } + return context; + } + + @Override + public ExecuteContext onThrow(ExecuteContext context) { return context; } } diff --git a/sermant-plugins/sermant-mq-consume-prohibition/kafka-1.x-plugin/src/main/java/com/huaweicloud/sermant/kafka/utils/KafkaEnhancementHelper.java b/sermant-plugins/sermant-mq-consume-prohibition/kafka-1.x-plugin/src/main/java/com/huaweicloud/sermant/kafka/utils/KafkaEnhancementHelper.java new file mode 100644 index 0000000000..7799fc277f --- /dev/null +++ b/sermant-plugins/sermant-mq-consume-prohibition/kafka-1.x-plugin/src/main/java/com/huaweicloud/sermant/kafka/utils/KafkaEnhancementHelper.java @@ -0,0 +1,89 @@ +/* + * Copyright (C) 2023-2023 Huawei Technologies Co., Ltd. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.huaweicloud.sermant.kafka.utils; + +import com.huaweicloud.sermant.core.plugin.agent.declarer.InterceptDeclarer; +import com.huaweicloud.sermant.core.plugin.agent.matcher.ClassMatcher; +import com.huaweicloud.sermant.core.plugin.agent.matcher.MethodMatcher; +import com.huaweicloud.sermant.kafka.extension.KafkaConsumerHandler; +import com.huaweicloud.sermant.kafka.interceptor.KafkaConsumerConstructorInterceptor; +import com.huaweicloud.sermant.kafka.interceptor.KafkaConsumerPollInterceptor; + +/** + * Kafka拦截点辅助类 + * + * @author lilai + * @since 2023-12-05 + */ +public class KafkaEnhancementHelper { + private static final String ENHANCE_CLASS = "org.apache.kafka.clients.consumer.KafkaConsumer"; + + private static final int PARAM_COUNT = 3; + + private KafkaEnhancementHelper() { + } + + /** + * 获取Kafka拦截点的ClassMatcher + * + * @return 返回ClassMatcher + */ + public static ClassMatcher getClassMatcher() { + return ClassMatcher.nameEquals(ENHANCE_CLASS); + } + + /** + * 获取Kafka拦截点的拦截声明器 + * + * @return 返回拦截声明器 + */ + public static InterceptDeclarer getConstructorInterceptDeclarers() { + return InterceptDeclarer.build(getConstructorMethodMatcher(), new KafkaConsumerConstructorInterceptor()); + } + + /** + * 获取带有KafkaConsumerHandler的拦截声明器 + * + * @param handler Kafka消费者处理器 + * @return 返回拦截声明器 + */ + public static InterceptDeclarer getConstructorInterceptDeclarers(KafkaConsumerHandler handler) { + return InterceptDeclarer.build(getConstructorMethodMatcher(), new KafkaConsumerConstructorInterceptor(handler)); + } + + /** + * 获取构造方法拦截的方法匹配器 + * + * @return 方法匹配器 + */ + private static MethodMatcher getConstructorMethodMatcher() { + return MethodMatcher.isConstructor().and(MethodMatcher.paramCountEquals(PARAM_COUNT)); + } + + /** + * 获取Poll方法拦截的方法匹配器 + * + * @return 方法匹配器 + */ + public static InterceptDeclarer getPollInterceptDeclarers() { + return InterceptDeclarer.build(getPollMethodMatcher(), new KafkaConsumerPollInterceptor()); + } + + private static MethodMatcher getPollMethodMatcher() { + return MethodMatcher.nameEquals("poll"); + } +} diff --git a/sermant-plugins/sermant-mq-consume-deny/kafka-consumer-plugin/src/main/resources/META-INF/services/com.huaweicloud.sermant.core.plugin.agent.declarer.PluginDeclarer b/sermant-plugins/sermant-mq-consume-prohibition/kafka-1.x-plugin/src/main/resources/META-INF/services/com.huaweicloud.sermant.core.plugin.agent.declarer.PluginDeclarer similarity index 100% rename from sermant-plugins/sermant-mq-consume-deny/kafka-consumer-plugin/src/main/resources/META-INF/services/com.huaweicloud.sermant.core.plugin.agent.declarer.PluginDeclarer rename to sermant-plugins/sermant-mq-consume-prohibition/kafka-1.x-plugin/src/main/resources/META-INF/services/com.huaweicloud.sermant.core.plugin.agent.declarer.PluginDeclarer diff --git a/sermant-plugins/sermant-mq-consume-deny/pom.xml b/sermant-plugins/sermant-mq-consume-prohibition/pom.xml similarity index 62% rename from sermant-plugins/sermant-mq-consume-deny/pom.xml rename to sermant-plugins/sermant-mq-consume-prohibition/pom.xml index d68b62beb5..cb7b8b9147 100644 --- a/sermant-plugins/sermant-mq-consume-deny/pom.xml +++ b/sermant-plugins/sermant-mq-consume-prohibition/pom.xml @@ -1,20 +1,20 @@ + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> sermant-plugins com.huaweicloud.sermant 1.0.0 4.0.0 - sermant-mq-consume-deny - sermant-mq-consume-deny + sermant-mq-consume-prohibition + sermant-mq-consume-prohibition pom ${pom.basedir}/../../.. - mq-consume-deny + mq-consume-prohibition @@ -34,25 +34,25 @@ true - kafka-consumer-plugin - rabbitmq-consumer-plugin - message-common + kafka-1.x-plugin + consumer-controller + config-service test - kafka-consumer-plugin - rabbitmq-consumer-plugin - message-common + kafka-1.x-plugin + consumer-controller + config-service release - kafka-consumer-plugin - rabbitmq-consumer-plugin - message-common + kafka-1.x-plugin + consumer-controller + config-service