diff --git a/sermant-agentcore/sermant-agentcore-core/src/main/java/com/huaweicloud/sermant/core/utils/NetworkUtils.java b/sermant-agentcore/sermant-agentcore-core/src/main/java/com/huaweicloud/sermant/core/utils/NetworkUtils.java index a31ca10ffc..faa4c0b628 100644 --- a/sermant-agentcore/sermant-agentcore-core/src/main/java/com/huaweicloud/sermant/core/utils/NetworkUtils.java +++ b/sermant-agentcore/sermant-agentcore-core/src/main/java/com/huaweicloud/sermant/core/utils/NetworkUtils.java @@ -19,6 +19,7 @@ import com.huaweicloud.sermant.core.common.LoggerFactory; import com.huaweicloud.sermant.core.exception.NetInterfacesCheckException; +import java.net.Inet4Address; import java.net.InetAddress; import java.net.NetworkInterface; import java.net.SocketException; @@ -43,6 +44,8 @@ public class NetworkUtils { private static final String LOCAL_HOST_IP = "127.0.0.1"; + private static final String EMPTY_STR = ""; + private NetworkUtils() { } @@ -100,6 +103,47 @@ public static Optional getHostName() { } catch (UnknownHostException e) { return Optional.empty(); } + } + /** + * 获取Linux下的IP地址 + * + * @return IP地址 + */ + public static String getMachineIp() { + try { + for (Enumeration networkInterfaceEnumeration = NetworkInterface.getNetworkInterfaces(); + networkInterfaceEnumeration.hasMoreElements(); ) { + NetworkInterface networkInterface = networkInterfaceEnumeration.nextElement(); + String name = networkInterface.getName(); + if (name.contains("docker") || name.contains("lo")) { + continue; + } + String ip = resolveNetworkIp(networkInterface); + if (!EMPTY_STR.equals(ip)) { + return ip; + } + } + } catch (SocketException exception) { + LOGGER.warning("An exception occurred while getting the machine's IP address."); + } + LOGGER.severe("Can not acquire correct instance ip , it will be replaced by local ip!"); + return LOCAL_HOST_IP; + } + + private static String resolveNetworkIp(NetworkInterface networkInterface) { + for (Enumeration enumIpAddr = networkInterface.getInetAddresses(); + enumIpAddr.hasMoreElements(); ) { + InetAddress inetAddress = enumIpAddr.nextElement(); + if (!(inetAddress instanceof Inet4Address) || inetAddress.isLoopbackAddress()) { + continue; + } + String ipaddress = inetAddress.getHostAddress(); + if (!EMPTY_STR.equals(ipaddress) && !LOCAL_HOST_IP.equals(ipaddress)) { + // 取第一个符合要求的IP + return ipaddress; + } + } + return EMPTY_STR; } } diff --git a/sermant-plugins/sermant-mq-consume-prohibition/config-service/src/main/java/com/huaweicloud/sermant/mq/dynamicconfig/MqConfigListener.java b/sermant-plugins/sermant-mq-consume-prohibition/config-service/src/main/java/com/huaweicloud/sermant/mq/dynamicconfig/MqConfigListener.java index 6103767789..53c860a4d4 100644 --- a/sermant-plugins/sermant-mq-consume-prohibition/config-service/src/main/java/com/huaweicloud/sermant/mq/dynamicconfig/MqConfigListener.java +++ b/sermant-plugins/sermant-mq-consume-prohibition/config-service/src/main/java/com/huaweicloud/sermant/mq/dynamicconfig/MqConfigListener.java @@ -122,7 +122,7 @@ private void processDeleteEvent(DynamicConfigEvent event) { } private void executeProhibition() { - KafkaConsumerController.getConsumerCache() + KafkaConsumerController.getKafkaConsumerCache() .forEach(obj -> KafkaConsumerController.disableConsumption(obj, ProhibitionConfigManager.getKafkaProhibitionTopics())); } diff --git a/sermant-plugins/sermant-mq-consume-prohibition/consumer-controller/pom.xml b/sermant-plugins/sermant-mq-consume-prohibition/consumer-controller/pom.xml index 9a0d40ac7f..688460b6b9 100644 --- a/sermant-plugins/sermant-mq-consume-prohibition/consumer-controller/pom.xml +++ b/sermant-plugins/sermant-mq-consume-prohibition/consumer-controller/pom.xml @@ -10,24 +10,31 @@ 4.0.0 consumer-controller + + + 8 + 8 + 2.7.0 + 4.4 + + com.huaweicloud.sermant - sermant-agentcore-god - 1.0.0 - compile + sermant-agentcore-core + provided org.apache.kafka kafka-clients - 1.1.0 + ${kafka.client.version} provided + + org.apache.commons + commons-collections4 + ${collections4.version} + - - 8 - 8 - - \ No newline at end of file diff --git a/sermant-plugins/sermant-mq-consume-prohibition/consumer-controller/src/main/java/com/huaweicloud/sermant/kafka/cache/KafkaConsumerCache.java b/sermant-plugins/sermant-mq-consume-prohibition/consumer-controller/src/main/java/com/huaweicloud/sermant/kafka/cache/KafkaConsumerCache.java index 882f80bb79..cb8a4f80a5 100644 --- a/sermant-plugins/sermant-mq-consume-prohibition/consumer-controller/src/main/java/com/huaweicloud/sermant/kafka/cache/KafkaConsumerCache.java +++ b/sermant-plugins/sermant-mq-consume-prohibition/consumer-controller/src/main/java/com/huaweicloud/sermant/kafka/cache/KafkaConsumerCache.java @@ -16,8 +16,13 @@ package com.huaweicloud.sermant.kafka.cache; +import com.huaweicloud.sermant.core.config.ConfigManager; +import com.huaweicloud.sermant.core.plugin.config.ServiceMeta; +import com.huaweicloud.sermant.core.utils.NetworkUtils; + import org.apache.kafka.clients.consumer.KafkaConsumer; +import java.util.HashSet; import java.util.Set; import java.util.concurrent.CopyOnWriteArraySet; @@ -39,11 +44,6 @@ public enum KafkaConsumerCache { private final Set kafkaConsumerCache = new CopyOnWriteArraySet<>(); KafkaConsumerCache() { - init(); - } - - private void init() { - } /** @@ -60,7 +60,7 @@ public Set getCache() { * * @param kafkaConsumer 消费者实例 */ - public void updateCache(KafkaConsumer kafkaConsumer) { + public void addKafkaConsumer(KafkaConsumer kafkaConsumer) { kafkaConsumerCache.add(convert(kafkaConsumer)); } @@ -71,6 +71,18 @@ public void updateCache(KafkaConsumer kafkaConsumer) { * @return 消费者包装实例 */ private KafkaConsumerWrapper convert(KafkaConsumer kafkaConsumer) { - return new KafkaConsumerWrapper(); + KafkaConsumerWrapper wrapper = new KafkaConsumerWrapper(); + ServiceMeta serviceMeta = ConfigManager.getConfig(ServiceMeta.class); + wrapper.setKafkaConsumer(kafkaConsumer); + wrapper.setZone(serviceMeta.getZone()); + wrapper.setProject(serviceMeta.getProject()); + wrapper.setEnvironment(serviceMeta.getEnvironment()); + wrapper.setApplication(serviceMeta.getApplication()); + wrapper.setService(serviceMeta.getService()); + wrapper.setServerAddress(NetworkUtils.getMachineIp()); + wrapper.setOriginalTopics(new HashSet<>()); + wrapper.setOriginalPartitions(new HashSet<>()); + wrapper.setAssign(false); + return wrapper; } } diff --git a/sermant-plugins/sermant-mq-consume-prohibition/consumer-controller/src/main/java/com/huaweicloud/sermant/kafka/cache/KafkaConsumerWrapper.java b/sermant-plugins/sermant-mq-consume-prohibition/consumer-controller/src/main/java/com/huaweicloud/sermant/kafka/cache/KafkaConsumerWrapper.java index c557632184..ad547d65b6 100644 --- a/sermant-plugins/sermant-mq-consume-prohibition/consumer-controller/src/main/java/com/huaweicloud/sermant/kafka/cache/KafkaConsumerWrapper.java +++ b/sermant-plugins/sermant-mq-consume-prohibition/consumer-controller/src/main/java/com/huaweicloud/sermant/kafka/cache/KafkaConsumerWrapper.java @@ -16,6 +16,12 @@ package com.huaweicloud.sermant.kafka.cache; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.TopicPartition; + +import java.util.Collection; +import java.util.Set; + /** * Kafka实例包装类 * @@ -23,4 +29,130 @@ * @since 2023-12-05 */ public class KafkaConsumerWrapper { + private KafkaConsumer kafkaConsumer; + + /** + * 宿主应用自身订阅的Topic + */ + private Set originalTopics; + + /** + * 是否使用assign方法指定订阅 + */ + private boolean isAssign; + + /** + * 使用assign方法指定的Topic和分区 + */ + private Collection originalPartitions; + + /** + * 当前消费者的服务所在可用区 + */ + private String zone; + + /** + * 当前消费者的服务所在可用区命名空间 + */ + private String project; + + /** + * 当前消费者的服务所在的环境 + */ + private String environment; + + /** + * 当前消费者的服务所在的应用 + */ + private String application; + + /** + * 当前消费者的所在服务的名称 + */ + private String service; + + /** + * 当前消费者的所在服务的IP + */ + private String serverAddress; + + public KafkaConsumer getKafkaConsumer() { + return kafkaConsumer; + } + + public void setKafkaConsumer(KafkaConsumer kafkaConsumer) { + this.kafkaConsumer = kafkaConsumer; + } + + public Set getOriginalTopics() { + return originalTopics; + } + + public void setOriginalTopics(Set originalTopics) { + this.originalTopics = originalTopics; + } + + public String getZone() { + return zone; + } + + public void setZone(String zone) { + this.zone = zone; + } + + public String getProject() { + return project; + } + + public void setProject(String project) { + this.project = project; + } + + public String getEnvironment() { + return environment; + } + + public void setEnvironment(String environment) { + this.environment = environment; + } + + public String getApplication() { + return application; + } + + public void setApplication(String application) { + this.application = application; + } + + public String getService() { + return service; + } + + public void setService(String service) { + this.service = service; + } + + public String getServerAddress() { + return serverAddress; + } + + public void setServerAddress(String serverAddress) { + this.serverAddress = serverAddress; + } + + public boolean isAssign() { + return isAssign; + } + + public void setAssign(boolean assign) { + this.isAssign = assign; + } + + public Collection getOriginalPartitions() { + return originalPartitions; + } + + public void setOriginalPartitions(Collection originalPartitions) { + this.originalPartitions = originalPartitions; + } } diff --git a/sermant-plugins/sermant-mq-consume-prohibition/consumer-controller/src/main/java/com/huaweicloud/sermant/kafka/controller/KafkaConsumerController.java b/sermant-plugins/sermant-mq-consume-prohibition/consumer-controller/src/main/java/com/huaweicloud/sermant/kafka/controller/KafkaConsumerController.java index 5ee4e267cc..426755af3d 100644 --- a/sermant-plugins/sermant-mq-consume-prohibition/consumer-controller/src/main/java/com/huaweicloud/sermant/kafka/controller/KafkaConsumerController.java +++ b/sermant-plugins/sermant-mq-consume-prohibition/consumer-controller/src/main/java/com/huaweicloud/sermant/kafka/controller/KafkaConsumerController.java @@ -19,9 +19,13 @@ import com.huaweicloud.sermant.kafka.cache.KafkaConsumerCache; import com.huaweicloud.sermant.kafka.cache.KafkaConsumerWrapper; +import org.apache.commons.collections4.CollectionUtils; import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.TopicPartition; +import java.util.Collection; import java.util.Set; +import java.util.stream.Collectors; /** * KafkaConsumer消费控制器 @@ -34,30 +38,36 @@ private KafkaConsumerController() { } /** - * 关闭消费 + * 执行禁止消费 * * @param kafkaConsumerWrapper 消费者包装实例 - * @param topics 消费主题 + * @param prohibitionTopics 消费主题 */ - public static void disableConsumption(KafkaConsumerWrapper kafkaConsumerWrapper, Set topics) { - } + public static void disableConsumption(KafkaConsumerWrapper kafkaConsumerWrapper, Set prohibitionTopics) { + Set originalTopics = kafkaConsumerWrapper.getOriginalTopics(); + Collection originalPartitions = kafkaConsumerWrapper.getOriginalPartitions(); - /** - * 开启消费 - * - * @param kafkaConsumerWrapper 消费者包装实例 - * @param topics 消费主题 - */ - public static void enableConsumption(KafkaConsumerWrapper kafkaConsumerWrapper, Set topics) { + // 未订阅任何Topic,无需操作 + if (originalTopics.size() == 0) { + return; + } + + KafkaConsumer kafkaConsumer = kafkaConsumerWrapper.getKafkaConsumer(); + Collection subtractTopics = CollectionUtils.subtract(originalTopics, prohibitionTopics); + if (kafkaConsumerWrapper.isAssign()) { + kafkaConsumer.assign(originalPartitions.stream().filter(obj -> subtractTopics.contains(obj.topic())) + .collect(Collectors.toSet())); + } + kafkaConsumer.subscribe(subtractTopics); } /** - * 更新消费者缓存 + * 新增消费者缓存 * * @param kafkaConsumer 消费者实例 */ - public static void updateConsumerCache(KafkaConsumer kafkaConsumer) { - KafkaConsumerCache.INSTANCE.updateCache(kafkaConsumer); + public static void addKafkaConsumerCache(KafkaConsumer kafkaConsumer) { + KafkaConsumerCache.INSTANCE.addKafkaConsumer(kafkaConsumer); } /** @@ -65,7 +75,16 @@ public static void updateConsumerCache(KafkaConsumer kafkaConsumer) { * * @return 消费者缓存 */ - public static Set getConsumerCache() { + public static Set getKafkaConsumerCache() { return KafkaConsumerCache.INSTANCE.getCache(); } + + /** + * 移除消费者缓存 + * + * @param kafkaConsumerWrapper 消费者包装实例 + */ + public static void removeKafkaConsumeCache(KafkaConsumerWrapper kafkaConsumerWrapper) { + KafkaConsumerCache.INSTANCE.getCache().remove(kafkaConsumerWrapper); + } } diff --git a/sermant-plugins/sermant-mq-consume-prohibition/consumer-controller/src/main/java/com/huaweicloud/sermant/kafka/extension/KafkaConsumerAssignHandler.java b/sermant-plugins/sermant-mq-consume-prohibition/consumer-controller/src/main/java/com/huaweicloud/sermant/kafka/extension/KafkaConsumerAssignHandler.java new file mode 100644 index 0000000000..bb3f38bf44 --- /dev/null +++ b/sermant-plugins/sermant-mq-consume-prohibition/consumer-controller/src/main/java/com/huaweicloud/sermant/kafka/extension/KafkaConsumerAssignHandler.java @@ -0,0 +1,51 @@ +/* + * Copyright (C) 2023-2023 Huawei Technologies Co., Ltd. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.huaweicloud.sermant.kafka.extension; + +import com.huaweicloud.sermant.core.plugin.agent.entity.ExecuteContext; + +import java.util.Collection; + +/** + * KafkaConsumer assign方法处理器接口,供外部实现在KafkaConsumer订阅Topic时执行相应操作 + * {@link org.apache.kafka.clients.consumer.KafkaConsumer#assign(Collection)} + * + * @author lilai + * @since 2023-12-05 + */ +public interface KafkaConsumerAssignHandler { + /** + * 拦截点之前的处理 + * + * @param context 拦截点上下文 + */ + void doBefore(ExecuteContext context); + + /** + * 拦截点之后的处理 + * + * @param context 拦截点上下文 + */ + void doAfter(ExecuteContext context); + + /** + * 拦截点异常时的处理 + * + * @param context 拦截点上下文 + */ + void doOnThrow(ExecuteContext context); +} diff --git a/sermant-plugins/sermant-mq-consume-prohibition/consumer-controller/src/main/java/com/huaweicloud/sermant/kafka/extension/KafkaConsumerCloseHandler.java b/sermant-plugins/sermant-mq-consume-prohibition/consumer-controller/src/main/java/com/huaweicloud/sermant/kafka/extension/KafkaConsumerCloseHandler.java new file mode 100644 index 0000000000..5f0132db6d --- /dev/null +++ b/sermant-plugins/sermant-mq-consume-prohibition/consumer-controller/src/main/java/com/huaweicloud/sermant/kafka/extension/KafkaConsumerCloseHandler.java @@ -0,0 +1,51 @@ +/* + * Copyright (C) 2023-2023 Huawei Technologies Co., Ltd. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.huaweicloud.sermant.kafka.extension; + +import com.huaweicloud.sermant.core.plugin.agent.entity.ExecuteContext; + +import java.util.concurrent.TimeUnit; + +/** + * KafkaConsumer close方法处理器接口,供外部实现在KafkaConsumer关闭消费者时执行相应操作 + * {@link org.apache.kafka.clients.consumer.KafkaConsumer#close(long, TimeUnit)} + * + * @author lilai + * @since 2023-12-14 + */ +public interface KafkaConsumerCloseHandler { + /** + * 拦截点之前的处理 + * + * @param context 拦截点上下文 + */ + void doBefore(ExecuteContext context); + + /** + * 拦截点之后的处理 + * + * @param context 拦截点上下文 + */ + void doAfter(ExecuteContext context); + + /** + * 拦截点异常时的处理 + * + * @param context 拦截点上下文 + */ + void doOnThrow(ExecuteContext context); +} diff --git a/sermant-plugins/sermant-mq-consume-prohibition/consumer-controller/src/main/java/com/huaweicloud/sermant/kafka/extension/KafkaConsumerConstructHandler.java b/sermant-plugins/sermant-mq-consume-prohibition/consumer-controller/src/main/java/com/huaweicloud/sermant/kafka/extension/KafkaConsumerConstructHandler.java new file mode 100644 index 0000000000..bb34205fb3 --- /dev/null +++ b/sermant-plugins/sermant-mq-consume-prohibition/consumer-controller/src/main/java/com/huaweicloud/sermant/kafka/extension/KafkaConsumerConstructHandler.java @@ -0,0 +1,55 @@ +/* + * Copyright (C) 2023-2023 Huawei Technologies Co., Ltd. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.huaweicloud.sermant.kafka.extension; + +import com.huaweicloud.sermant.core.plugin.agent.entity.ExecuteContext; + +import org.apache.kafka.common.serialization.Deserializer; + +import java.util.Map; +import java.util.Properties; + +/** + * KafkaConsumer构造方法处理器接口,供外部实现在创建KafkaConsumer时执行相应操作 + * {@link org.apache.kafka.clients.consumer.KafkaConsumer#KafkaConsumer(Properties, Deserializer, Deserializer)} + * {@link org.apache.kafka.clients.consumer.KafkaConsumer#KafkaConsumer(Map, Deserializer, Deserializer)} + * + * @author lilai + * @since 2023-12-05 + */ +public interface KafkaConsumerConstructHandler { + /** + * 拦截点之前的处理 + * + * @param context 拦截点上下文 + */ + void doBefore(ExecuteContext context); + + /** + * 拦截点之后的处理 + * + * @param context 拦截点上下文 + */ + void doAfter(ExecuteContext context); + + /** + * 拦截点异常时的处理 + * + * @param context 拦截点上下文 + */ + void doOnThrow(ExecuteContext context); +} diff --git a/sermant-plugins/sermant-mq-consume-prohibition/consumer-controller/src/main/java/com/huaweicloud/sermant/kafka/extension/KafkaConsumerSubscribeHandler.java b/sermant-plugins/sermant-mq-consume-prohibition/consumer-controller/src/main/java/com/huaweicloud/sermant/kafka/extension/KafkaConsumerSubscribeHandler.java new file mode 100644 index 0000000000..3a5ad2e808 --- /dev/null +++ b/sermant-plugins/sermant-mq-consume-prohibition/consumer-controller/src/main/java/com/huaweicloud/sermant/kafka/extension/KafkaConsumerSubscribeHandler.java @@ -0,0 +1,57 @@ +/* + * Copyright (C) 2023-2023 Huawei Technologies Co., Ltd. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.huaweicloud.sermant.kafka.extension; + +import com.huaweicloud.sermant.core.plugin.agent.entity.ExecuteContext; + +import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; + +import java.util.Collection; +import java.util.regex.Pattern; + +/** + * KafkaConsumer subscribe方法处理器接口,供外部实现在KafkaConsumer订阅Topic时执行相应操作 + * {@link org.apache.kafka.clients.consumer.KafkaConsumer#subscribe(Pattern)} + * {@link org.apache.kafka.clients.consumer.KafkaConsumer#subscribe(Pattern, ConsumerRebalanceListener)} + * {@link org.apache.kafka.clients.consumer.KafkaConsumer#subscribe(Collection)} + * {@link org.apache.kafka.clients.consumer.KafkaConsumer#subscribe(Collection, ConsumerRebalanceListener)} + * + * @author lilai + * @since 2023-12-05 + */ +public interface KafkaConsumerSubscribeHandler { + /** + * 拦截点之前的处理 + * + * @param context 拦截点上下文 + */ + void doBefore(ExecuteContext context); + + /** + * 拦截点之后的处理 + * + * @param context 拦截点上下文 + */ + void doAfter(ExecuteContext context); + + /** + * 拦截点异常时的处理 + * + * @param context 拦截点上下文 + */ + void doOnThrow(ExecuteContext context); +} diff --git a/sermant-plugins/sermant-mq-consume-prohibition/consumer-controller/src/main/java/com/huaweicloud/sermant/kafka/extension/KafkaConsumerHandler.java b/sermant-plugins/sermant-mq-consume-prohibition/consumer-controller/src/main/java/com/huaweicloud/sermant/kafka/extension/KafkaConsumerUnSubscribeHandler.java similarity index 74% rename from sermant-plugins/sermant-mq-consume-prohibition/consumer-controller/src/main/java/com/huaweicloud/sermant/kafka/extension/KafkaConsumerHandler.java rename to sermant-plugins/sermant-mq-consume-prohibition/consumer-controller/src/main/java/com/huaweicloud/sermant/kafka/extension/KafkaConsumerUnSubscribeHandler.java index e6bc3a518e..08c281c37c 100644 --- a/sermant-plugins/sermant-mq-consume-prohibition/consumer-controller/src/main/java/com/huaweicloud/sermant/kafka/extension/KafkaConsumerHandler.java +++ b/sermant-plugins/sermant-mq-consume-prohibition/consumer-controller/src/main/java/com/huaweicloud/sermant/kafka/extension/KafkaConsumerUnSubscribeHandler.java @@ -19,12 +19,13 @@ import com.huaweicloud.sermant.core.plugin.agent.entity.ExecuteContext; /** - * KafkaConsumer处理器接口,供外部实现在创建KafkaConsumer时执行相应操作 + * KafkaConsumer unsubscribe方法处理器接口,供外部实现在KafkaConsumer取消订阅时执行相应操作 + * {@link org.apache.kafka.clients.consumer.KafkaConsumer#unsubscribe()} * * @author lilai * @since 2023-12-05 */ -public interface KafkaConsumerHandler { +public interface KafkaConsumerUnSubscribeHandler { /** * 拦截点之前的处理 * @@ -38,4 +39,11 @@ public interface KafkaConsumerHandler { * @param context 拦截点上下文 */ void doAfter(ExecuteContext context); + + /** + * 拦截点异常时的处理 + * + * @param context 拦截点上下文 + */ + void doOnThrow(ExecuteContext context); } diff --git a/sermant-plugins/sermant-mq-consume-prohibition/kafka-1.x-plugin/pom.xml b/sermant-plugins/sermant-mq-consume-prohibition/kafka-1.x-2.x-plugin/pom.xml similarity index 95% rename from sermant-plugins/sermant-mq-consume-prohibition/kafka-1.x-plugin/pom.xml rename to sermant-plugins/sermant-mq-consume-prohibition/kafka-1.x-2.x-plugin/pom.xml index d5d39b878f..f10d0b793c 100644 --- a/sermant-plugins/sermant-mq-consume-prohibition/kafka-1.x-plugin/pom.xml +++ b/sermant-plugins/sermant-mq-consume-prohibition/kafka-1.x-2.x-plugin/pom.xml @@ -9,7 +9,7 @@ 4.0.0 - kafka-1.x-plugin + kafka-1.x-2.x-plugin 8 @@ -17,7 +17,7 @@ UTF-8 plugin false - 1.1.0 + 2.7.0 diff --git a/sermant-plugins/sermant-mq-consume-prohibition/kafka-1.x-plugin/src/main/java/com/huaweicloud/sermant/kafka/declarer/KafkaConsumerDeclarer.java b/sermant-plugins/sermant-mq-consume-prohibition/kafka-1.x-2.x-plugin/src/main/java/com/huaweicloud/sermant/kafka/declarer/KafkaConsumerDeclarer.java similarity index 76% rename from sermant-plugins/sermant-mq-consume-prohibition/kafka-1.x-plugin/src/main/java/com/huaweicloud/sermant/kafka/declarer/KafkaConsumerDeclarer.java rename to sermant-plugins/sermant-mq-consume-prohibition/kafka-1.x-2.x-plugin/src/main/java/com/huaweicloud/sermant/kafka/declarer/KafkaConsumerDeclarer.java index 76aba57758..5bf5e1e037 100644 --- a/sermant-plugins/sermant-mq-consume-prohibition/kafka-1.x-plugin/src/main/java/com/huaweicloud/sermant/kafka/declarer/KafkaConsumerDeclarer.java +++ b/sermant-plugins/sermant-mq-consume-prohibition/kafka-1.x-2.x-plugin/src/main/java/com/huaweicloud/sermant/kafka/declarer/KafkaConsumerDeclarer.java @@ -33,7 +33,13 @@ public ClassMatcher getClassMatcher() { @Override public InterceptDeclarer[] getInterceptDeclarers(ClassLoader classLoader) { return new InterceptDeclarer[]{ - KafkaEnhancementHelper.getConstructorInterceptDeclarers(), - KafkaEnhancementHelper.getPollInterceptDeclarers()}; + KafkaEnhancementHelper.getPropertiesConstructorInterceptDeclarers(), + KafkaEnhancementHelper.getMapConstructorInterceptDeclarers(), + KafkaEnhancementHelper.getSubscribeInterceptDeclarers(), + KafkaEnhancementHelper.getAssignInterceptDeclarers(), + KafkaEnhancementHelper.getUnsubscribeInterceptDeclarers(), + KafkaEnhancementHelper.getPollInterceptDeclarers(), + KafkaEnhancementHelper.getCloseInterceptDeclarers() + }; } } diff --git a/sermant-plugins/sermant-mq-consume-prohibition/kafka-1.x-2.x-plugin/src/main/java/com/huaweicloud/sermant/kafka/interceptor/AbstractKafkaTopicInterceptor.java b/sermant-plugins/sermant-mq-consume-prohibition/kafka-1.x-2.x-plugin/src/main/java/com/huaweicloud/sermant/kafka/interceptor/AbstractKafkaTopicInterceptor.java new file mode 100644 index 0000000000..736b1f91c9 --- /dev/null +++ b/sermant-plugins/sermant-mq-consume-prohibition/kafka-1.x-2.x-plugin/src/main/java/com/huaweicloud/sermant/kafka/interceptor/AbstractKafkaTopicInterceptor.java @@ -0,0 +1,134 @@ +/* + * Copyright (C) 2023-2023 Huawei Technologies Co., Ltd. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.huaweicloud.sermant.kafka.interceptor; + +import com.huaweicloud.sermant.core.plugin.agent.entity.ExecuteContext; +import com.huaweicloud.sermant.core.plugin.agent.interceptor.AbstractInterceptor; +import com.huaweicloud.sermant.kafka.cache.KafkaConsumerWrapper; +import com.huaweicloud.sermant.kafka.controller.KafkaConsumerController; + +import org.apache.kafka.clients.consumer.KafkaConsumer; + +import java.util.Optional; + +/** + * subscribe, unsubscribe 以及 assign 方法的抽象拦截器 + * + * @author lilai + * @since 2023-12-09 + */ +public abstract class AbstractKafkaTopicInterceptor extends AbstractInterceptor { + private static final String KAFKA_CONSUMER_CLASS_NAME = "org.apache.kafka.clients.consumer.KafkaConsumer"; + + private static final String CONSUMER_CONTROLLER_CLASS_NAME = "com.huaweicloud.sermant.kafka.controller" + + ".KafkaConsumerController"; + + /** + * 构造器 + */ + public AbstractKafkaTopicInterceptor() { + } + + @Override + public ExecuteContext before(ExecuteContext context) { + if (isInvokeBySermant()) { + return context; + } + return doBefore(context); + } + + @Override + public ExecuteContext after(ExecuteContext context) { + if (isInvokeBySermant()) { + return context; + } + + Object kafkaConsumerObject = context.getObject(); + if (!(kafkaConsumerObject instanceof KafkaConsumer)) { + return context; + } + + KafkaConsumer consumer = (KafkaConsumer) kafkaConsumerObject; + Optional optionalKafkaConsumerWrapper = KafkaConsumerController.getKafkaConsumerCache() + .stream() + .filter(obj -> obj.getKafkaConsumer().equals(consumer)) + .findFirst(); + if (!optionalKafkaConsumerWrapper.isPresent()) { + return context; + } + return doAfter(context, optionalKafkaConsumerWrapper.get()); + } + + @Override + public ExecuteContext onThrow(ExecuteContext context) { + if (isInvokeBySermant()) { + return context; + } + return doOnThrow(context); + } + + /** + * 判断是否Sermant发起的调用 + * + * @return 是否Sermant发起的调用 + */ + protected boolean isInvokeBySermant() { + StackTraceElement[] stackTrace = Thread.currentThread().getStackTrace(); + int stackTraceIdxMax = stackTrace.length - 1; + for (int i = 0; i < stackTrace.length; i++) { + if (!KAFKA_CONSUMER_CLASS_NAME.equals(stackTrace[i].getClassName())) { + continue; + } + if (i == stackTraceIdxMax) { + break; + } + if (KAFKA_CONSUMER_CLASS_NAME.equals(stackTrace[i + 1].getClassName())) { + continue; + } + if (CONSUMER_CONTROLLER_CLASS_NAME.equals(stackTrace[i + 1].getClassName())) { + return true; + } + } + return false; + } + + /** + * 前置触发点 + * + * @param context 执行上下文 + * @return 执行上下文 + */ + protected abstract ExecuteContext doBefore(ExecuteContext context); + + /** + * 后置触发点 + * + * @param context 执行上下文 + * @param kafkaConsumerWrapper + * @return 执行上下文 + */ + protected abstract ExecuteContext doAfter(ExecuteContext context, + KafkaConsumerWrapper kafkaConsumerWrapper); + + /** + * 异常触发点 + * + * @param context 执行上下文 + * @return 执行上下文 + */ + protected abstract ExecuteContext doOnThrow(ExecuteContext context); +} diff --git a/sermant-plugins/sermant-mq-consume-prohibition/kafka-1.x-2.x-plugin/src/main/java/com/huaweicloud/sermant/kafka/interceptor/KafkaConsumerAssignInterceptor.java b/sermant-plugins/sermant-mq-consume-prohibition/kafka-1.x-2.x-plugin/src/main/java/com/huaweicloud/sermant/kafka/interceptor/KafkaConsumerAssignInterceptor.java new file mode 100644 index 0000000000..a367b96c38 --- /dev/null +++ b/sermant-plugins/sermant-mq-consume-prohibition/kafka-1.x-2.x-plugin/src/main/java/com/huaweicloud/sermant/kafka/interceptor/KafkaConsumerAssignInterceptor.java @@ -0,0 +1,99 @@ +/* + * Copyright (C) 2023-2023 Huawei Technologies Co., Ltd. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.huaweicloud.sermant.kafka.interceptor; + +import com.huaweicloud.sermant.config.ProhibitionConfigManager; +import com.huaweicloud.sermant.core.common.LoggerFactory; +import com.huaweicloud.sermant.core.plugin.agent.entity.ExecuteContext; +import com.huaweicloud.sermant.kafka.cache.KafkaConsumerWrapper; +import com.huaweicloud.sermant.kafka.controller.KafkaConsumerController; +import com.huaweicloud.sermant.kafka.extension.KafkaConsumerAssignHandler; + +import org.apache.kafka.common.TopicPartition; + +import java.util.Collection; +import java.util.HashSet; +import java.util.Set; +import java.util.logging.Logger; + +/** + * KafkaConsumer assign方法的拦截器 + * {@link org.apache.kafka.clients.consumer.KafkaConsumer#assign(Collection)} + * + * @author lilai + * @since 2023-12-05 + */ +public class KafkaConsumerAssignInterceptor extends AbstractKafkaTopicInterceptor { + private static final Logger LOGGER = LoggerFactory.getLogger(); + + private KafkaConsumerAssignHandler handler; + + /** + * 带有KafkaConsumerAssignHandler的构造方法 + * + * @param handler assign方法拦截点处理器 + */ + public KafkaConsumerAssignInterceptor(KafkaConsumerAssignHandler handler) { + this.handler = handler; + } + + /** + * 无参构造方法 + */ + public KafkaConsumerAssignInterceptor() { + } + + @Override + public ExecuteContext doBefore(ExecuteContext context) { + if (handler != null) { + handler.doBefore(context); + } + return context; + } + + @Override + public ExecuteContext doAfter(ExecuteContext context, KafkaConsumerWrapper kafkaConsumerWrapper) { + updateKafkaConsumerWrapper(kafkaConsumerWrapper); + if (handler != null) { + handler.doAfter(context); + } else { + LOGGER.info("Try to check if it is need to disable consumption after assignment..."); + + // 宿主应用每次订阅时都检查是否需要取消订阅其中的Topic + KafkaConsumerController.disableConsumption(kafkaConsumerWrapper, + ProhibitionConfigManager.getKafkaProhibitionTopics()); + } + return context; + } + + @Override + public ExecuteContext doOnThrow(ExecuteContext context) { + if (handler != null) { + handler.doOnThrow(context); + } + return context; + } + + private void updateKafkaConsumerWrapper(KafkaConsumerWrapper kafkaConsumerWrapper) { + kafkaConsumerWrapper.setAssign(true); + Set assignment = kafkaConsumerWrapper.getKafkaConsumer().assignment(); + kafkaConsumerWrapper.setOriginalPartitions(assignment); + Set assignedTopics = new HashSet<>(); + assignment.forEach(obj -> assignedTopics.add(obj.topic())); + kafkaConsumerWrapper.setOriginalTopics(assignedTopics); + } +} diff --git a/sermant-plugins/sermant-mq-consume-prohibition/kafka-1.x-2.x-plugin/src/main/java/com/huaweicloud/sermant/kafka/interceptor/KafkaConsumerCloseInterceptor.java b/sermant-plugins/sermant-mq-consume-prohibition/kafka-1.x-2.x-plugin/src/main/java/com/huaweicloud/sermant/kafka/interceptor/KafkaConsumerCloseInterceptor.java new file mode 100644 index 0000000000..ad7f4ccf90 --- /dev/null +++ b/sermant-plugins/sermant-mq-consume-prohibition/kafka-1.x-2.x-plugin/src/main/java/com/huaweicloud/sermant/kafka/interceptor/KafkaConsumerCloseInterceptor.java @@ -0,0 +1,81 @@ +/* + * Copyright (C) 2023-2023 Huawei Technologies Co., Ltd. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.huaweicloud.sermant.kafka.interceptor; + +import com.huaweicloud.sermant.core.common.LoggerFactory; +import com.huaweicloud.sermant.core.plugin.agent.entity.ExecuteContext; +import com.huaweicloud.sermant.kafka.cache.KafkaConsumerWrapper; +import com.huaweicloud.sermant.kafka.controller.KafkaConsumerController; +import com.huaweicloud.sermant.kafka.extension.KafkaConsumerCloseHandler; + +import java.util.concurrent.TimeUnit; +import java.util.logging.Logger; + +/** + * KafkaConsumer close方法的拦截器 + * {@link org.apache.kafka.clients.consumer.KafkaConsumer#close(long, TimeUnit)} + * + * @author lilai + * @since 2023-12-14 + */ +public class KafkaConsumerCloseInterceptor extends AbstractKafkaTopicInterceptor { + private static final Logger LOGGER = LoggerFactory.getLogger(); + + private KafkaConsumerCloseHandler handler; + + /** + * 带有KafkaConsumerSubscribeHandler的构造方法 + * + * @param handler subscribe方法拦截点处理器 + */ + public KafkaConsumerCloseInterceptor(KafkaConsumerCloseHandler handler) { + this.handler = handler; + } + + /** + * 无参构造方法 + */ + public KafkaConsumerCloseInterceptor() { + } + + @Override + public ExecuteContext doBefore(ExecuteContext context) { + if (handler != null) { + handler.doBefore(context); + } + return context; + } + + @Override + public ExecuteContext doAfter(ExecuteContext context, KafkaConsumerWrapper kafkaConsumerWrapper) { + if (handler != null) { + handler.doAfter(context); + } + + KafkaConsumerController.removeKafkaConsumeCache(kafkaConsumerWrapper); + LOGGER.info("Remove consumer cache after closing."); + return context; + } + + @Override + public ExecuteContext doOnThrow(ExecuteContext context) { + if (handler != null) { + handler.doOnThrow(context); + } + return context; + } +} diff --git a/sermant-plugins/sermant-mq-consume-prohibition/kafka-1.x-2.x-plugin/src/main/java/com/huaweicloud/sermant/kafka/interceptor/KafkaConsumerMapConstructorInterceptor.java b/sermant-plugins/sermant-mq-consume-prohibition/kafka-1.x-2.x-plugin/src/main/java/com/huaweicloud/sermant/kafka/interceptor/KafkaConsumerMapConstructorInterceptor.java new file mode 100644 index 0000000000..0cfa1d54d3 --- /dev/null +++ b/sermant-plugins/sermant-mq-consume-prohibition/kafka-1.x-2.x-plugin/src/main/java/com/huaweicloud/sermant/kafka/interceptor/KafkaConsumerMapConstructorInterceptor.java @@ -0,0 +1,112 @@ +/* + * Copyright (C) 2023-2023 Huawei Technologies Co., Ltd. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.huaweicloud.sermant.kafka.interceptor; + +import com.huaweicloud.sermant.core.common.LoggerFactory; +import com.huaweicloud.sermant.core.plugin.agent.entity.ExecuteContext; +import com.huaweicloud.sermant.core.plugin.agent.interceptor.AbstractInterceptor; +import com.huaweicloud.sermant.kafka.controller.KafkaConsumerController; +import com.huaweicloud.sermant.kafka.extension.KafkaConsumerConstructHandler; +import com.huaweicloud.sermant.kafka.utils.MarkUtils; + +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.serialization.Deserializer; + +import java.util.Map; +import java.util.Properties; +import java.util.logging.Logger; + +/** + * KafkaConsumer构造方法的拦截器 + * {@link org.apache.kafka.clients.consumer.KafkaConsumer#KafkaConsumer(Properties, Deserializer, Deserializer)} + * {@link org.apache.kafka.clients.consumer.KafkaConsumer#KafkaConsumer(Map, Deserializer, Deserializer)} + * + * @author lilai + * @since 2023-12-05 + */ +public class KafkaConsumerMapConstructorInterceptor extends AbstractInterceptor { + private static final Logger LOGGER = LoggerFactory.getLogger(); + + private KafkaConsumerConstructHandler handler; + + /** + * 带有KafkaConsumerConstructHandler的构造方法 + * + * @param handler 构造方法拦截点处理器 + */ + public KafkaConsumerMapConstructorInterceptor(KafkaConsumerConstructHandler handler) { + this.handler = handler; + } + + /** + * 无参数构造方法 + */ + public KafkaConsumerMapConstructorInterceptor() { + } + + @Override + public ExecuteContext before(ExecuteContext context) { + // 高版本Properties会调用Map方式,避免重复进入 + if (MarkUtils.getMark() != null) { + return context; + } + if (handler != null) { + handler.doBefore(context); + } + return context; + } + + @Override + public ExecuteContext after(ExecuteContext context) { + // 高版本Properties会调用Map方式,避免重复进入 + if (MarkUtils.getMark() != null) { + return context; + } + if (handler != null) { + handler.doAfter(context); + } + + cacheKafkaConsumer(context); + return context; + } + + @Override + public ExecuteContext onThrow(ExecuteContext context) { + // 高版本Properties会调用Map方式,避免重复进入 + if (MarkUtils.getMark() != null) { + return context; + } + if (handler != null) { + handler.doOnThrow(context); + } + return context; + } + + /** + * 缓存消费者实例 + * + * @param context 拦截点执行上下文 + */ + private void cacheKafkaConsumer(ExecuteContext context) { + Object kafkaConsumerObject = context.getObject(); + if (kafkaConsumerObject instanceof KafkaConsumer) { + KafkaConsumer consumer = (KafkaConsumer) kafkaConsumerObject; + KafkaConsumerController.addKafkaConsumerCache(consumer); + LOGGER.info("KafkaConsumer has been cached by Sermant."); + } + } +} diff --git a/sermant-plugins/sermant-mq-consume-prohibition/kafka-1.x-plugin/src/main/java/com/huaweicloud/sermant/kafka/interceptor/KafkaConsumerPollInterceptor.java b/sermant-plugins/sermant-mq-consume-prohibition/kafka-1.x-2.x-plugin/src/main/java/com/huaweicloud/sermant/kafka/interceptor/KafkaConsumerPollInterceptor.java similarity index 68% rename from sermant-plugins/sermant-mq-consume-prohibition/kafka-1.x-plugin/src/main/java/com/huaweicloud/sermant/kafka/interceptor/KafkaConsumerPollInterceptor.java rename to sermant-plugins/sermant-mq-consume-prohibition/kafka-1.x-2.x-plugin/src/main/java/com/huaweicloud/sermant/kafka/interceptor/KafkaConsumerPollInterceptor.java index ca3e90883f..2bc1f62d23 100644 --- a/sermant-plugins/sermant-mq-consume-prohibition/kafka-1.x-plugin/src/main/java/com/huaweicloud/sermant/kafka/interceptor/KafkaConsumerPollInterceptor.java +++ b/sermant-plugins/sermant-mq-consume-prohibition/kafka-1.x-2.x-plugin/src/main/java/com/huaweicloud/sermant/kafka/interceptor/KafkaConsumerPollInterceptor.java @@ -20,10 +20,15 @@ import com.huaweicloud.sermant.core.plugin.agent.entity.ExecuteContext; import com.huaweicloud.sermant.core.plugin.agent.interceptor.AbstractInterceptor; +import org.apache.kafka.clients.consumer.ConsumerRecords; + +import java.time.Duration; import java.util.logging.Logger; /** * KafkaConsumer构造方法的拦截器 + * {@link org.apache.kafka.clients.consumer.KafkaConsumer#poll(long)} + * {@link org.apache.kafka.clients.consumer.KafkaConsumer#poll(Duration)} * * @author lilai * @since 2023-12-05 @@ -31,6 +36,9 @@ public class KafkaConsumerPollInterceptor extends AbstractInterceptor { private static final Logger LOGGER = LoggerFactory.getLogger(); + private static final String ERROR_MESSAGE = "Consumer is not subscribed to any topics or " + + "assigned any partitions"; + /** * 无参构造方法 */ @@ -49,6 +57,12 @@ public ExecuteContext after(ExecuteContext context) { @Override public ExecuteContext onThrow(ExecuteContext context) { + if (context.getThrowable() instanceof IllegalStateException + && ERROR_MESSAGE.equals(context.getThrowable().getMessage())) { + context.changeThrowable(null); + context.changeResult(ConsumerRecords.empty()); + LOGGER.info("No consuming topic at this moment, catch exception and return empty result"); + } return context; } } diff --git a/sermant-plugins/sermant-mq-consume-prohibition/kafka-1.x-plugin/src/main/java/com/huaweicloud/sermant/kafka/interceptor/KafkaConsumerConstructorInterceptor.java b/sermant-plugins/sermant-mq-consume-prohibition/kafka-1.x-2.x-plugin/src/main/java/com/huaweicloud/sermant/kafka/interceptor/KafkaConsumerPropertiesConstructorInterceptor.java similarity index 65% rename from sermant-plugins/sermant-mq-consume-prohibition/kafka-1.x-plugin/src/main/java/com/huaweicloud/sermant/kafka/interceptor/KafkaConsumerConstructorInterceptor.java rename to sermant-plugins/sermant-mq-consume-prohibition/kafka-1.x-2.x-plugin/src/main/java/com/huaweicloud/sermant/kafka/interceptor/KafkaConsumerPropertiesConstructorInterceptor.java index 7b42cd167e..f8c0cb0707 100644 --- a/sermant-plugins/sermant-mq-consume-prohibition/kafka-1.x-plugin/src/main/java/com/huaweicloud/sermant/kafka/interceptor/KafkaConsumerConstructorInterceptor.java +++ b/sermant-plugins/sermant-mq-consume-prohibition/kafka-1.x-2.x-plugin/src/main/java/com/huaweicloud/sermant/kafka/interceptor/KafkaConsumerPropertiesConstructorInterceptor.java @@ -20,40 +20,52 @@ import com.huaweicloud.sermant.core.plugin.agent.entity.ExecuteContext; import com.huaweicloud.sermant.core.plugin.agent.interceptor.AbstractInterceptor; import com.huaweicloud.sermant.kafka.controller.KafkaConsumerController; -import com.huaweicloud.sermant.kafka.extension.KafkaConsumerHandler; +import com.huaweicloud.sermant.kafka.extension.KafkaConsumerConstructHandler; +import com.huaweicloud.sermant.kafka.utils.MarkUtils; import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.serialization.Deserializer; +import java.util.Map; +import java.util.Properties; import java.util.logging.Logger; /** * KafkaConsumer构造方法的拦截器 + * {@link KafkaConsumer#KafkaConsumer(Properties, Deserializer, Deserializer)} + * {@link KafkaConsumer#KafkaConsumer(Map, Deserializer, Deserializer)} * * @author lilai * @since 2023-12-05 */ -public class KafkaConsumerConstructorInterceptor extends AbstractInterceptor { +public class KafkaConsumerPropertiesConstructorInterceptor extends AbstractInterceptor { private static final Logger LOGGER = LoggerFactory.getLogger(); - private KafkaConsumerHandler handler; + private KafkaConsumerConstructHandler handler; /** - * 带有KafkaConsumerHandler的构造方案 + * 带有KafkaConsumerConstructHandler的构造方法 * - * @param handler + * @param handler 构造方法拦截点处理器 */ - public KafkaConsumerConstructorInterceptor(KafkaConsumerHandler handler) { + public KafkaConsumerPropertiesConstructorInterceptor(KafkaConsumerConstructHandler handler) { this.handler = handler; } /** * 无参数构造方法 */ - public KafkaConsumerConstructorInterceptor() { + public KafkaConsumerPropertiesConstructorInterceptor() { } @Override public ExecuteContext before(ExecuteContext context) { + // 此处为兼容不同版本KafkaConsumer的构造方法 + // 低版本中Properties和Map方式二者互不影响,高版本Properties会调用Map方式 + if (MarkUtils.getMark() != null) { + return context; + } + MarkUtils.setMark(Boolean.TRUE); if (handler != null) { handler.doBefore(context); } @@ -64,8 +76,6 @@ public ExecuteContext before(ExecuteContext context) { public ExecuteContext after(ExecuteContext context) { if (handler != null) { handler.doAfter(context); - } else { - processStartUpConsumption(); } cacheKafkaConsumer(context); @@ -81,14 +91,8 @@ private void cacheKafkaConsumer(ExecuteContext context) { Object kafkaConsumerObject = context.getObject(); if (kafkaConsumerObject instanceof KafkaConsumer) { KafkaConsumer consumer = (KafkaConsumer) kafkaConsumerObject; - KafkaConsumerController.updateConsumerCache(consumer); - LOGGER.info("KafkaConsumer has been cached by Sermant"); + KafkaConsumerController.addKafkaConsumerCache(consumer); + LOGGER.info("KafkaConsumer has been cached by Sermant."); } } - - /** - * 处理启动过程中的禁止消费 - */ - private void processStartUpConsumption() { - } } diff --git a/sermant-plugins/sermant-mq-consume-prohibition/kafka-1.x-2.x-plugin/src/main/java/com/huaweicloud/sermant/kafka/interceptor/KafkaConsumerSubscribeInterceptor.java b/sermant-plugins/sermant-mq-consume-prohibition/kafka-1.x-2.x-plugin/src/main/java/com/huaweicloud/sermant/kafka/interceptor/KafkaConsumerSubscribeInterceptor.java new file mode 100644 index 0000000000..bfafd6ced7 --- /dev/null +++ b/sermant-plugins/sermant-mq-consume-prohibition/kafka-1.x-2.x-plugin/src/main/java/com/huaweicloud/sermant/kafka/interceptor/KafkaConsumerSubscribeInterceptor.java @@ -0,0 +1,90 @@ +/* + * Copyright (C) 2023-2023 Huawei Technologies Co., Ltd. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.huaweicloud.sermant.kafka.interceptor; + +import com.huaweicloud.sermant.config.ProhibitionConfigManager; +import com.huaweicloud.sermant.core.common.LoggerFactory; +import com.huaweicloud.sermant.core.plugin.agent.entity.ExecuteContext; +import com.huaweicloud.sermant.kafka.cache.KafkaConsumerWrapper; +import com.huaweicloud.sermant.kafka.controller.KafkaConsumerController; +import com.huaweicloud.sermant.kafka.extension.KafkaConsumerSubscribeHandler; + +import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; + +import java.util.Collection; +import java.util.logging.Logger; +import java.util.regex.Pattern; + +/** + * KafkaConsumer subscribe方法的拦截器 + * {@link org.apache.kafka.clients.consumer.KafkaConsumer#subscribe(Pattern, ConsumerRebalanceListener)} + * {@link org.apache.kafka.clients.consumer.KafkaConsumer#subscribe(Collection, ConsumerRebalanceListener)} + * + * @author lilai + * @since 2023-12-05 + */ +public class KafkaConsumerSubscribeInterceptor extends AbstractKafkaTopicInterceptor { + private static final Logger LOGGER = LoggerFactory.getLogger(); + + private KafkaConsumerSubscribeHandler handler; + + /** + * 带有KafkaConsumerSubscribeHandler的构造方法 + * + * @param handler subscribe方法拦截点处理器 + */ + public KafkaConsumerSubscribeInterceptor(KafkaConsumerSubscribeHandler handler) { + this.handler = handler; + } + + /** + * 无参构造方法 + */ + public KafkaConsumerSubscribeInterceptor() { + } + + @Override + public ExecuteContext doBefore(ExecuteContext context) { + if (handler != null) { + handler.doBefore(context); + } + return context; + } + + @Override + public ExecuteContext doAfter(ExecuteContext context, KafkaConsumerWrapper kafkaConsumerWrapper) { + kafkaConsumerWrapper.setOriginalTopics(kafkaConsumerWrapper.getKafkaConsumer().subscription()); + if (handler != null) { + handler.doAfter(context); + } else { + LOGGER.info("Try to check if it is need to disable consumption after assignment..."); + + // 宿主应用每次订阅时都检查是否需要取消订阅其中的Topic + KafkaConsumerController.disableConsumption(kafkaConsumerWrapper, + ProhibitionConfigManager.getKafkaProhibitionTopics()); + } + return context; + } + + @Override + public ExecuteContext doOnThrow(ExecuteContext context) { + if (handler != null) { + handler.doOnThrow(context); + } + return context; + } +} diff --git a/sermant-plugins/sermant-mq-consume-prohibition/kafka-1.x-2.x-plugin/src/main/java/com/huaweicloud/sermant/kafka/interceptor/KafkaConsumerUnSubscribeInterceptor.java b/sermant-plugins/sermant-mq-consume-prohibition/kafka-1.x-2.x-plugin/src/main/java/com/huaweicloud/sermant/kafka/interceptor/KafkaConsumerUnSubscribeInterceptor.java new file mode 100644 index 0000000000..f9a5bb7749 --- /dev/null +++ b/sermant-plugins/sermant-mq-consume-prohibition/kafka-1.x-2.x-plugin/src/main/java/com/huaweicloud/sermant/kafka/interceptor/KafkaConsumerUnSubscribeInterceptor.java @@ -0,0 +1,85 @@ +/* + * Copyright (C) 2023-2023 Huawei Technologies Co., Ltd. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.huaweicloud.sermant.kafka.interceptor; + +import com.huaweicloud.sermant.core.common.LoggerFactory; +import com.huaweicloud.sermant.core.plugin.agent.entity.ExecuteContext; +import com.huaweicloud.sermant.kafka.cache.KafkaConsumerWrapper; +import com.huaweicloud.sermant.kafka.extension.KafkaConsumerUnSubscribeHandler; + +import java.util.Collections; +import java.util.logging.Logger; + +/** + * KafkaConsumer unsubscribe方法的拦截器 + * {@link org.apache.kafka.clients.consumer.KafkaConsumer#unsubscribe()} + * + * @author lilai + * @since 2023-12-05 + */ +public class KafkaConsumerUnSubscribeInterceptor extends AbstractKafkaTopicInterceptor { + private static final Logger LOGGER = LoggerFactory.getLogger(); + + private KafkaConsumerUnSubscribeHandler handler; + + /** + * 带有KafkaConsumerUnSubscribeHandler的构造方法 + * + * @param handler unsubscribe方法拦截点处理器 + */ + public KafkaConsumerUnSubscribeInterceptor(KafkaConsumerUnSubscribeHandler handler) { + this.handler = handler; + } + + /** + * 无参构造方法 + */ + public KafkaConsumerUnSubscribeInterceptor() { + } + + @Override + public ExecuteContext doBefore(ExecuteContext context) { + if (handler != null) { + handler.doBefore(context); + } + return context; + } + + @Override + public ExecuteContext doAfter(ExecuteContext context, + KafkaConsumerWrapper kafkaConsumerWrapper) { + updateKafkaConsumerWrapper(kafkaConsumerWrapper); + if (handler != null) { + handler.doAfter(context); + } + return context; + } + + @Override + public ExecuteContext doOnThrow(ExecuteContext context) { + if (handler != null) { + handler.doOnThrow(context); + } + return context; + } + + private void updateKafkaConsumerWrapper(KafkaConsumerWrapper kafkaConsumerWrapper) { + kafkaConsumerWrapper.setOriginalTopics(Collections.emptySet()); + kafkaConsumerWrapper.setOriginalPartitions(Collections.emptySet()); + kafkaConsumerWrapper.setAssign(false); + } +} diff --git a/sermant-plugins/sermant-mq-consume-prohibition/kafka-1.x-2.x-plugin/src/main/java/com/huaweicloud/sermant/kafka/utils/KafkaEnhancementHelper.java b/sermant-plugins/sermant-mq-consume-prohibition/kafka-1.x-2.x-plugin/src/main/java/com/huaweicloud/sermant/kafka/utils/KafkaEnhancementHelper.java new file mode 100644 index 0000000000..c2c40cf405 --- /dev/null +++ b/sermant-plugins/sermant-mq-consume-prohibition/kafka-1.x-2.x-plugin/src/main/java/com/huaweicloud/sermant/kafka/utils/KafkaEnhancementHelper.java @@ -0,0 +1,268 @@ +/* + * Copyright (C) 2023-2023 Huawei Technologies Co., Ltd. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.huaweicloud.sermant.kafka.utils; + +import com.huaweicloud.sermant.core.plugin.agent.declarer.InterceptDeclarer; +import com.huaweicloud.sermant.core.plugin.agent.matcher.ClassMatcher; +import com.huaweicloud.sermant.core.plugin.agent.matcher.MethodMatcher; +import com.huaweicloud.sermant.kafka.extension.KafkaConsumerAssignHandler; +import com.huaweicloud.sermant.kafka.extension.KafkaConsumerCloseHandler; +import com.huaweicloud.sermant.kafka.extension.KafkaConsumerConstructHandler; +import com.huaweicloud.sermant.kafka.extension.KafkaConsumerSubscribeHandler; +import com.huaweicloud.sermant.kafka.extension.KafkaConsumerUnSubscribeHandler; +import com.huaweicloud.sermant.kafka.interceptor.KafkaConsumerAssignInterceptor; +import com.huaweicloud.sermant.kafka.interceptor.KafkaConsumerCloseInterceptor; +import com.huaweicloud.sermant.kafka.interceptor.KafkaConsumerMapConstructorInterceptor; +import com.huaweicloud.sermant.kafka.interceptor.KafkaConsumerPollInterceptor; +import com.huaweicloud.sermant.kafka.interceptor.KafkaConsumerPropertiesConstructorInterceptor; +import com.huaweicloud.sermant.kafka.interceptor.KafkaConsumerSubscribeInterceptor; +import com.huaweicloud.sermant.kafka.interceptor.KafkaConsumerUnSubscribeInterceptor; + +import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; +import org.apache.kafka.common.serialization.Deserializer; + +import java.time.Duration; +import java.util.Collection; +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.TimeUnit; +import java.util.regex.Pattern; + +/** + * Kafka拦截点辅助类 + * + * @author lilai + * @since 2023-12-05 + */ +public class KafkaEnhancementHelper { + private static final String ENHANCE_CLASS = "org.apache.kafka.clients.consumer.KafkaConsumer"; + + private static final int SUBSCRIBE_PARAM_COUNT = 2; + + private KafkaEnhancementHelper() { + } + + /** + * 获取Kafka拦截点的ClassMatcher + * + * @return 返回ClassMatcher + */ + public static ClassMatcher getClassMatcher() { + return ClassMatcher.nameEquals(ENHANCE_CLASS); + } + + /** + * 获取Kafka properties构造方法拦截点的拦截声明器 + * + * @return 返回拦截声明器 + */ + public static InterceptDeclarer getPropertiesConstructorInterceptDeclarers() { + return InterceptDeclarer.build(getPropertiesConstructorMethodMatcher(), + new KafkaConsumerPropertiesConstructorInterceptor()); + } + + /** + * 获取带有KafkaConsumerConstructHandler的Kafka properties构造方法拦截声明器 + * + * @param handler Kafka消费者处理器 + * @return 返回拦截声明器 + */ + public static InterceptDeclarer getPropertiesConstructorInterceptDeclarers(KafkaConsumerConstructHandler handler) { + return InterceptDeclarer.build(getPropertiesConstructorMethodMatcher(), + new KafkaConsumerPropertiesConstructorInterceptor(handler)); + } + + /** + * 获取Kafka map构造方法拦截点的拦截声明器 + * + * @return 返回拦截声明器 + */ + public static InterceptDeclarer getMapConstructorInterceptDeclarers() { + return InterceptDeclarer.build(getMapConstructorMethodMatcher(), + new KafkaConsumerMapConstructorInterceptor()); + } + + /** + * 获取带有KafkaConsumerConstructHandler的Kafka map构造方法拦截声明器 + * + * @param handler Kafka消费者处理器 + * @return 返回拦截声明器 + */ + public static InterceptDeclarer getMapConstructorInterceptDeclarers(KafkaConsumerConstructHandler handler) { + return InterceptDeclarer.build(getMapConstructorMethodMatcher(), + new KafkaConsumerMapConstructorInterceptor(handler)); + } + + /** + * 获取Kafka subscribe方法拦截点的拦截声明器 + * + * @return 返回拦截声明器 + */ + public static InterceptDeclarer getSubscribeInterceptDeclarers() { + return InterceptDeclarer.build(getSubScribeMethodMatcher(), new KafkaConsumerSubscribeInterceptor()); + } + + /** + * 获取获取带有KafkaConsumerSubscribeHandler的Kafka subscribe方法拦截点的拦截声明器 + * + * @param handler 处理器 + * @return 返回拦截声明器 + */ + public static InterceptDeclarer getSubscribeInterceptDeclarers(KafkaConsumerSubscribeHandler handler) { + return InterceptDeclarer.build(getSubScribeMethodMatcher(), new KafkaConsumerSubscribeInterceptor(handler)); + } + + /** + * 获取Kafka assign方法拦截点的拦截声明器 + * + * @return 返回拦截声明器 + */ + public static InterceptDeclarer getAssignInterceptDeclarers() { + return InterceptDeclarer.build(getAssignMethodMatcher(), new KafkaConsumerAssignInterceptor()); + } + + /** + * 获取获取带有KafkaConsumerAssignHandler的Kafka assign方法拦截点的拦截声明器 + * + * @param handler 处理器 + * @return 返回拦截声明器 + */ + public static InterceptDeclarer getAssignInterceptDeclarers(KafkaConsumerAssignHandler handler) { + return InterceptDeclarer.build(getAssignMethodMatcher(), new KafkaConsumerAssignInterceptor(handler)); + } + + /** + * 获取Kafka unsubscribe方法拦截点的拦截声明器 + * + * @return 返回拦截声明器 + */ + public static InterceptDeclarer getUnsubscribeInterceptDeclarers() { + return InterceptDeclarer.build(getUnSubscribeMethodMatcher(), new KafkaConsumerUnSubscribeInterceptor()); + } + + /** + * 获取Kafka unsubscribe方法拦截点的拦截声明器 + * + * @param handler 处理器 + * @return 返回拦截声明器 + */ + public static InterceptDeclarer getUnsubscribeInterceptDeclarers(KafkaConsumerUnSubscribeHandler handler) { + return InterceptDeclarer.build(getUnSubscribeMethodMatcher(), new KafkaConsumerUnSubscribeInterceptor(handler)); + } + + /** + * 获取Kafka close方法拦截点的拦截声明器 + * + * @return 返回拦截声明器 + */ + public static InterceptDeclarer getCloseInterceptDeclarers() { + return InterceptDeclarer.build(getCloseMethodMatcher(), new KafkaConsumerCloseInterceptor()); + } + + /** + * 获取Kafka close方法拦截点的拦截声明器 + * + * @param handler 处理器 + * @return 返回拦截声明器 + */ + public static InterceptDeclarer getCloseInterceptDeclarers(KafkaConsumerCloseHandler handler) { + return InterceptDeclarer.build(getCloseMethodMatcher(), new KafkaConsumerCloseInterceptor(handler)); + } + + /** + * 获取Kafka poll方法拦截点的拦截声明器 + * + * @return 返回拦截声明器 + */ + public static InterceptDeclarer getPollInterceptDeclarers() { + return InterceptDeclarer.build(getPollMethodMatcher(), new KafkaConsumerPollInterceptor()); + } + + /** + * 获取Properties构造方法拦截的方法匹配器 + * {@link org.apache.kafka.clients.consumer.KafkaConsumer#KafkaConsumer(Properties, Deserializer, Deserializer)} + * + * @return 方法匹配器 + */ + private static MethodMatcher getPropertiesConstructorMethodMatcher() { + return MethodMatcher.isConstructor().and(MethodMatcher.paramTypesEqual(Properties.class, + Deserializer.class, Deserializer.class)); + } + + /** + * 获取Map构造方法拦截的方法匹配器 + * {@link org.apache.kafka.clients.consumer.KafkaConsumer#KafkaConsumer(Map, Deserializer, Deserializer)} + * + * @return 方法匹配器 + */ + private static MethodMatcher getMapConstructorMethodMatcher() { + return MethodMatcher.isConstructor() + .and(MethodMatcher.paramTypesEqual(Map.class, Deserializer.class, Deserializer.class)); + } + + /** + * 获取subscribe方法拦截的方法匹配器 + * {@link org.apache.kafka.clients.consumer.KafkaConsumer#subscribe(Pattern, ConsumerRebalanceListener)} + * {@link org.apache.kafka.clients.consumer.KafkaConsumer#subscribe(Collection, ConsumerRebalanceListener)} + * + * @return 方法匹配器 + */ + private static MethodMatcher getSubScribeMethodMatcher() { + return MethodMatcher.nameEquals("subscribe").and(MethodMatcher.paramCountEquals(SUBSCRIBE_PARAM_COUNT)); + } + + /** + * 获取assign方法拦截的方法匹配器 + * {@link org.apache.kafka.clients.consumer.KafkaConsumer#assign(Collection)} + * + * @return 方法匹配器 + */ + private static MethodMatcher getAssignMethodMatcher() { + return MethodMatcher.nameEquals("assign"); + } + + /** + * 获取unsubscribe方法拦截的方法匹配器 + * {@link org.apache.kafka.clients.consumer.KafkaConsumer#unsubscribe()} + * + * @return 方法匹配器 + */ + private static MethodMatcher getUnSubscribeMethodMatcher() { + return MethodMatcher.nameEquals("unsubscribe"); + } + + /** + * 获取Poll方法拦截的方法匹配器 + * {@link org.apache.kafka.clients.consumer.KafkaConsumer#poll(long)} + * {@link org.apache.kafka.clients.consumer.KafkaConsumer#poll(Duration)} + * + * @return 方法匹配器 + */ + private static MethodMatcher getPollMethodMatcher() { + return MethodMatcher.nameEquals("poll"); + } + + /** + * 获取Close方法拦截的方法匹配器 + * {@link org.apache.kafka.clients.consumer.KafkaConsumer#close(long, TimeUnit)} + * + * @return 方法匹配器 + */ + private static MethodMatcher getCloseMethodMatcher() { + return MethodMatcher.nameEquals("close").and(MethodMatcher.paramTypesEqual(Long.TYPE, TimeUnit.class)); + } +} diff --git a/sermant-plugins/sermant-mq-consume-prohibition/kafka-1.x-2.x-plugin/src/main/java/com/huaweicloud/sermant/kafka/utils/MarkUtils.java b/sermant-plugins/sermant-mq-consume-prohibition/kafka-1.x-2.x-plugin/src/main/java/com/huaweicloud/sermant/kafka/utils/MarkUtils.java new file mode 100644 index 0000000000..06137b1436 --- /dev/null +++ b/sermant-plugins/sermant-mq-consume-prohibition/kafka-1.x-2.x-plugin/src/main/java/com/huaweicloud/sermant/kafka/utils/MarkUtils.java @@ -0,0 +1,63 @@ +/* + * Copyright (C) 2023-2023 Huawei Technologies Co., Ltd. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/* + * Copyright (C) 2023-2023 Huawei Technologies Co., Ltd. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.huaweicloud.sermant.kafka.utils; + +/** + * 线程变量标记类,用于兼容kafka不同版本不重复进入构造函数 + * + * @author lilai + * @since 2023-12-09 + */ +public class MarkUtils { + private static final ThreadLocal MARK = new ThreadLocal<>(); + + private MarkUtils() { + } + + /** + * 获取线程变量 + * + * @return 线程变量 + */ + public static Boolean getMark() { + return MARK.get(); + } + + /** + * 存入线程变量 + * + * @param value 线程变量 + */ + public static void setMark(Boolean value) { + MARK.set(value); + } +} \ No newline at end of file diff --git a/sermant-plugins/sermant-mq-consume-prohibition/kafka-1.x-2.x-plugin/src/main/resources/META-INF/services/com.huaweicloud.sermant.core.plugin.agent.declarer.PluginDeclarer b/sermant-plugins/sermant-mq-consume-prohibition/kafka-1.x-2.x-plugin/src/main/resources/META-INF/services/com.huaweicloud.sermant.core.plugin.agent.declarer.PluginDeclarer new file mode 100644 index 0000000000..e0223e9d09 --- /dev/null +++ b/sermant-plugins/sermant-mq-consume-prohibition/kafka-1.x-2.x-plugin/src/main/resources/META-INF/services/com.huaweicloud.sermant.core.plugin.agent.declarer.PluginDeclarer @@ -0,0 +1,17 @@ +# +# Copyright (C) 2023-2023 Huawei Technologies Co., Ltd. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +com.huaweicloud.sermant.kafka.declarer.KafkaConsumerDeclarer \ No newline at end of file diff --git a/sermant-plugins/sermant-mq-consume-prohibition/kafka-1.x-plugin/src/main/java/com/huaweicloud/sermant/kafka/utils/KafkaEnhancementHelper.java b/sermant-plugins/sermant-mq-consume-prohibition/kafka-1.x-plugin/src/main/java/com/huaweicloud/sermant/kafka/utils/KafkaEnhancementHelper.java deleted file mode 100644 index 7799fc277f..0000000000 --- a/sermant-plugins/sermant-mq-consume-prohibition/kafka-1.x-plugin/src/main/java/com/huaweicloud/sermant/kafka/utils/KafkaEnhancementHelper.java +++ /dev/null @@ -1,89 +0,0 @@ -/* - * Copyright (C) 2023-2023 Huawei Technologies Co., Ltd. All rights reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.huaweicloud.sermant.kafka.utils; - -import com.huaweicloud.sermant.core.plugin.agent.declarer.InterceptDeclarer; -import com.huaweicloud.sermant.core.plugin.agent.matcher.ClassMatcher; -import com.huaweicloud.sermant.core.plugin.agent.matcher.MethodMatcher; -import com.huaweicloud.sermant.kafka.extension.KafkaConsumerHandler; -import com.huaweicloud.sermant.kafka.interceptor.KafkaConsumerConstructorInterceptor; -import com.huaweicloud.sermant.kafka.interceptor.KafkaConsumerPollInterceptor; - -/** - * Kafka拦截点辅助类 - * - * @author lilai - * @since 2023-12-05 - */ -public class KafkaEnhancementHelper { - private static final String ENHANCE_CLASS = "org.apache.kafka.clients.consumer.KafkaConsumer"; - - private static final int PARAM_COUNT = 3; - - private KafkaEnhancementHelper() { - } - - /** - * 获取Kafka拦截点的ClassMatcher - * - * @return 返回ClassMatcher - */ - public static ClassMatcher getClassMatcher() { - return ClassMatcher.nameEquals(ENHANCE_CLASS); - } - - /** - * 获取Kafka拦截点的拦截声明器 - * - * @return 返回拦截声明器 - */ - public static InterceptDeclarer getConstructorInterceptDeclarers() { - return InterceptDeclarer.build(getConstructorMethodMatcher(), new KafkaConsumerConstructorInterceptor()); - } - - /** - * 获取带有KafkaConsumerHandler的拦截声明器 - * - * @param handler Kafka消费者处理器 - * @return 返回拦截声明器 - */ - public static InterceptDeclarer getConstructorInterceptDeclarers(KafkaConsumerHandler handler) { - return InterceptDeclarer.build(getConstructorMethodMatcher(), new KafkaConsumerConstructorInterceptor(handler)); - } - - /** - * 获取构造方法拦截的方法匹配器 - * - * @return 方法匹配器 - */ - private static MethodMatcher getConstructorMethodMatcher() { - return MethodMatcher.isConstructor().and(MethodMatcher.paramCountEquals(PARAM_COUNT)); - } - - /** - * 获取Poll方法拦截的方法匹配器 - * - * @return 方法匹配器 - */ - public static InterceptDeclarer getPollInterceptDeclarers() { - return InterceptDeclarer.build(getPollMethodMatcher(), new KafkaConsumerPollInterceptor()); - } - - private static MethodMatcher getPollMethodMatcher() { - return MethodMatcher.nameEquals("poll"); - } -} diff --git a/sermant-plugins/sermant-mq-consume-prohibition/kafka-1.x-plugin/src/main/resources/META-INF/services/com.huaweicloud.sermant.core.plugin.agent.declarer.PluginDeclarer b/sermant-plugins/sermant-mq-consume-prohibition/kafka-1.x-plugin/src/main/resources/META-INF/services/com.huaweicloud.sermant.core.plugin.agent.declarer.PluginDeclarer deleted file mode 100644 index a6ddfda59c..0000000000 --- a/sermant-plugins/sermant-mq-consume-prohibition/kafka-1.x-plugin/src/main/resources/META-INF/services/com.huaweicloud.sermant.core.plugin.agent.declarer.PluginDeclarer +++ /dev/null @@ -1 +0,0 @@ -com.huaweicloud.sermant.kafka.declarer.KafkaConsumerDeclarer \ No newline at end of file diff --git a/sermant-plugins/sermant-mq-consume-prohibition/pom.xml b/sermant-plugins/sermant-mq-consume-prohibition/pom.xml index cb7b8b9147..1cf5db2e8c 100644 --- a/sermant-plugins/sermant-mq-consume-prohibition/pom.xml +++ b/sermant-plugins/sermant-mq-consume-prohibition/pom.xml @@ -34,7 +34,7 @@ true - kafka-1.x-plugin + kafka-1.x-2.x-plugin consumer-controller config-service @@ -42,7 +42,7 @@ test - kafka-1.x-plugin + kafka-1.x-2.x-plugin consumer-controller config-service @@ -50,7 +50,7 @@ release - kafka-1.x-plugin + kafka-1.x-2.x-plugin consumer-controller config-service