diff --git a/sermant-agentcore/sermant-agentcore-core/src/main/java/com/huaweicloud/sermant/core/utils/NetworkUtils.java b/sermant-agentcore/sermant-agentcore-core/src/main/java/com/huaweicloud/sermant/core/utils/NetworkUtils.java index a31ca10ffc..faa4c0b628 100644 --- a/sermant-agentcore/sermant-agentcore-core/src/main/java/com/huaweicloud/sermant/core/utils/NetworkUtils.java +++ b/sermant-agentcore/sermant-agentcore-core/src/main/java/com/huaweicloud/sermant/core/utils/NetworkUtils.java @@ -19,6 +19,7 @@ import com.huaweicloud.sermant.core.common.LoggerFactory; import com.huaweicloud.sermant.core.exception.NetInterfacesCheckException; +import java.net.Inet4Address; import java.net.InetAddress; import java.net.NetworkInterface; import java.net.SocketException; @@ -43,6 +44,8 @@ public class NetworkUtils { private static final String LOCAL_HOST_IP = "127.0.0.1"; + private static final String EMPTY_STR = ""; + private NetworkUtils() { } @@ -100,6 +103,47 @@ public static Optional getHostName() { } catch (UnknownHostException e) { return Optional.empty(); } + } + /** + * 获取Linux下的IP地址 + * + * @return IP地址 + */ + public static String getMachineIp() { + try { + for (Enumeration networkInterfaceEnumeration = NetworkInterface.getNetworkInterfaces(); + networkInterfaceEnumeration.hasMoreElements(); ) { + NetworkInterface networkInterface = networkInterfaceEnumeration.nextElement(); + String name = networkInterface.getName(); + if (name.contains("docker") || name.contains("lo")) { + continue; + } + String ip = resolveNetworkIp(networkInterface); + if (!EMPTY_STR.equals(ip)) { + return ip; + } + } + } catch (SocketException exception) { + LOGGER.warning("An exception occurred while getting the machine's IP address."); + } + LOGGER.severe("Can not acquire correct instance ip , it will be replaced by local ip!"); + return LOCAL_HOST_IP; + } + + private static String resolveNetworkIp(NetworkInterface networkInterface) { + for (Enumeration enumIpAddr = networkInterface.getInetAddresses(); + enumIpAddr.hasMoreElements(); ) { + InetAddress inetAddress = enumIpAddr.nextElement(); + if (!(inetAddress instanceof Inet4Address) || inetAddress.isLoopbackAddress()) { + continue; + } + String ipaddress = inetAddress.getHostAddress(); + if (!EMPTY_STR.equals(ipaddress) && !LOCAL_HOST_IP.equals(ipaddress)) { + // 取第一个符合要求的IP + return ipaddress; + } + } + return EMPTY_STR; } } diff --git a/sermant-plugins/sermant-mq-consume-prohibition/config-service/src/main/java/com/huaweicloud/sermant/mq/dynamicconfig/MqConfigListener.java b/sermant-plugins/sermant-mq-consume-prohibition/config-service/src/main/java/com/huaweicloud/sermant/mq/dynamicconfig/MqConfigListener.java index 6103767789..dab38587fe 100644 --- a/sermant-plugins/sermant-mq-consume-prohibition/config-service/src/main/java/com/huaweicloud/sermant/mq/dynamicconfig/MqConfigListener.java +++ b/sermant-plugins/sermant-mq-consume-prohibition/config-service/src/main/java/com/huaweicloud/sermant/mq/dynamicconfig/MqConfigListener.java @@ -91,12 +91,12 @@ public void process(DynamicConfigEvent event) { private void processCreateOrUpdateEvent(DynamicConfigEvent event) { if (GLOBAL_CONFIG_KEY.equals(event.getKey())) { ProhibitionConfigManager.updateGlobalConfig(yaml.loadAs(event.getContent(), ProhibitionConfig.class)); - executeProhibition(); + markProhibition(); } if ((LOCAL_CONFIG_KEY_PREFIX + ConfigManager.getConfig(ServiceMeta.class).getService()).equals( event.getKey())) { ProhibitionConfigManager.updateLocalConfig(yaml.loadAs(event.getContent(), ProhibitionConfig.class)); - executeProhibition(); + markProhibition(); } LOGGER.info(String.format(Locale.ROOT, "Update mq-consume-prohibition config, current config: %s", ProhibitionConfigManager.printConfig())); @@ -110,21 +110,20 @@ private void processCreateOrUpdateEvent(DynamicConfigEvent event) { private void processDeleteEvent(DynamicConfigEvent event) { if (GLOBAL_CONFIG_KEY.equals(event.getKey())) { ProhibitionConfigManager.updateGlobalConfig(new ProhibitionConfig()); - executeProhibition(); + markProhibition(); } if ((LOCAL_CONFIG_KEY_PREFIX + ConfigManager.getConfig(ServiceMeta.class).getService()).equals( event.getKey())) { ProhibitionConfigManager.updateLocalConfig(new ProhibitionConfig()); - executeProhibition(); + markProhibition(); } LOGGER.info(String.format(Locale.ROOT, "Delete mq-consume-prohibition config, current config: %s", ProhibitionConfigManager.printConfig())); } - private void executeProhibition() { - KafkaConsumerController.getConsumerCache() - .forEach(obj -> KafkaConsumerController.disableConsumption(obj, - ProhibitionConfigManager.getKafkaProhibitionTopics())); + private void markProhibition() { + KafkaConsumerController.getKafkaConsumerCache().values() + .forEach(obj -> obj.getIsNeedExecute().set(true)); } /** diff --git a/sermant-plugins/sermant-mq-consume-prohibition/consumer-controller/pom.xml b/sermant-plugins/sermant-mq-consume-prohibition/consumer-controller/pom.xml index 9a0d40ac7f..688460b6b9 100644 --- a/sermant-plugins/sermant-mq-consume-prohibition/consumer-controller/pom.xml +++ b/sermant-plugins/sermant-mq-consume-prohibition/consumer-controller/pom.xml @@ -10,24 +10,31 @@ 4.0.0 consumer-controller + + + 8 + 8 + 2.7.0 + 4.4 + + com.huaweicloud.sermant - sermant-agentcore-god - 1.0.0 - compile + sermant-agentcore-core + provided org.apache.kafka kafka-clients - 1.1.0 + ${kafka.client.version} provided + + org.apache.commons + commons-collections4 + ${collections4.version} + - - 8 - 8 - - \ No newline at end of file diff --git a/sermant-plugins/sermant-mq-consume-prohibition/consumer-controller/src/main/java/com/huaweicloud/sermant/kafka/cache/KafkaConsumerCache.java b/sermant-plugins/sermant-mq-consume-prohibition/consumer-controller/src/main/java/com/huaweicloud/sermant/kafka/cache/KafkaConsumerCache.java index 882f80bb79..05c0678d96 100644 --- a/sermant-plugins/sermant-mq-consume-prohibition/consumer-controller/src/main/java/com/huaweicloud/sermant/kafka/cache/KafkaConsumerCache.java +++ b/sermant-plugins/sermant-mq-consume-prohibition/consumer-controller/src/main/java/com/huaweicloud/sermant/kafka/cache/KafkaConsumerCache.java @@ -16,10 +16,16 @@ package com.huaweicloud.sermant.kafka.cache; +import com.huaweicloud.sermant.core.config.ConfigManager; +import com.huaweicloud.sermant.core.plugin.config.ServiceMeta; +import com.huaweicloud.sermant.core.utils.NetworkUtils; + import org.apache.kafka.clients.consumer.KafkaConsumer; -import java.util.Set; -import java.util.concurrent.CopyOnWriteArraySet; +import java.util.HashSet; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicBoolean; /** * KafkaConsumer缓存 @@ -36,14 +42,9 @@ public enum KafkaConsumerCache { /** * 消费者缓存 */ - private final Set kafkaConsumerCache = new CopyOnWriteArraySet<>(); + private final Map kafkaConsumerCache = new ConcurrentHashMap<>(); KafkaConsumerCache() { - init(); - } - - private void init() { - } /** @@ -51,7 +52,7 @@ private void init() { * * @return 消费者缓存 */ - public Set getCache() { + public Map getCache() { return kafkaConsumerCache; } @@ -60,8 +61,8 @@ public Set getCache() { * * @param kafkaConsumer 消费者实例 */ - public void updateCache(KafkaConsumer kafkaConsumer) { - kafkaConsumerCache.add(convert(kafkaConsumer)); + public void addKafkaConsumer(KafkaConsumer kafkaConsumer) { + kafkaConsumerCache.put(kafkaConsumer.hashCode(), convert(kafkaConsumer)); } /** @@ -71,6 +72,19 @@ public void updateCache(KafkaConsumer kafkaConsumer) { * @return 消费者包装实例 */ private KafkaConsumerWrapper convert(KafkaConsumer kafkaConsumer) { - return new KafkaConsumerWrapper(); + KafkaConsumerWrapper wrapper = new KafkaConsumerWrapper(); + ServiceMeta serviceMeta = ConfigManager.getConfig(ServiceMeta.class); + wrapper.setKafkaConsumer(kafkaConsumer); + wrapper.setZone(serviceMeta.getZone()); + wrapper.setProject(serviceMeta.getProject()); + wrapper.setEnvironment(serviceMeta.getEnvironment()); + wrapper.setApplication(serviceMeta.getApplication()); + wrapper.setService(serviceMeta.getService()); + wrapper.setServerAddress(NetworkUtils.getMachineIp()); + wrapper.setOriginalTopics(new HashSet<>()); + wrapper.setOriginalPartitions(new HashSet<>()); + wrapper.setAssign(false); + wrapper.setNeedExecute(new AtomicBoolean(false)); + return wrapper; } } diff --git a/sermant-plugins/sermant-mq-consume-prohibition/consumer-controller/src/main/java/com/huaweicloud/sermant/kafka/cache/KafkaConsumerWrapper.java b/sermant-plugins/sermant-mq-consume-prohibition/consumer-controller/src/main/java/com/huaweicloud/sermant/kafka/cache/KafkaConsumerWrapper.java index c557632184..857231b014 100644 --- a/sermant-plugins/sermant-mq-consume-prohibition/consumer-controller/src/main/java/com/huaweicloud/sermant/kafka/cache/KafkaConsumerWrapper.java +++ b/sermant-plugins/sermant-mq-consume-prohibition/consumer-controller/src/main/java/com/huaweicloud/sermant/kafka/cache/KafkaConsumerWrapper.java @@ -16,6 +16,13 @@ package com.huaweicloud.sermant.kafka.cache; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.TopicPartition; + +import java.util.Collection; +import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; + /** * Kafka实例包装类 * @@ -23,4 +30,143 @@ * @since 2023-12-05 */ public class KafkaConsumerWrapper { + private KafkaConsumer kafkaConsumer; + + /** + * 宿主应用自身订阅的Topic + */ + private Set originalTopics; + + /** + * 是否使用assign方法指定订阅 + */ + private boolean isAssign; + + /** + * 使用assign方法指定的Topic和分区 + */ + private Collection originalPartitions; + + /** + * 是否需要在poll前处理执行禁止消费 + */ + private AtomicBoolean isNeedExecute; + + /** + * 当前消费者的服务所在可用区 + */ + private String zone; + + /** + * 当前消费者的服务所在可用区命名空间 + */ + private String project; + + /** + * 当前消费者的服务所在的环境 + */ + private String environment; + + /** + * 当前消费者的服务所在的应用 + */ + private String application; + + /** + * 当前消费者的所在服务的名称 + */ + private String service; + + /** + * 当前消费者的所在服务的IP + */ + private String serverAddress; + + public KafkaConsumer getKafkaConsumer() { + return kafkaConsumer; + } + + public void setKafkaConsumer(KafkaConsumer kafkaConsumer) { + this.kafkaConsumer = kafkaConsumer; + } + + public Set getOriginalTopics() { + return originalTopics; + } + + public void setOriginalTopics(Set originalTopics) { + this.originalTopics = originalTopics; + } + + public String getZone() { + return zone; + } + + public void setZone(String zone) { + this.zone = zone; + } + + public String getProject() { + return project; + } + + public void setProject(String project) { + this.project = project; + } + + public String getEnvironment() { + return environment; + } + + public void setEnvironment(String environment) { + this.environment = environment; + } + + public String getApplication() { + return application; + } + + public void setApplication(String application) { + this.application = application; + } + + public String getService() { + return service; + } + + public void setService(String service) { + this.service = service; + } + + public String getServerAddress() { + return serverAddress; + } + + public void setServerAddress(String serverAddress) { + this.serverAddress = serverAddress; + } + + public boolean isAssign() { + return isAssign; + } + + public void setAssign(boolean assign) { + this.isAssign = assign; + } + + public Collection getOriginalPartitions() { + return originalPartitions; + } + + public void setOriginalPartitions(Collection originalPartitions) { + this.originalPartitions = originalPartitions; + } + + public AtomicBoolean getIsNeedExecute() { + return isNeedExecute; + } + + public void setNeedExecute(AtomicBoolean needExecute) { + this.isNeedExecute = needExecute; + } } diff --git a/sermant-plugins/sermant-mq-consume-prohibition/consumer-controller/src/main/java/com/huaweicloud/sermant/kafka/controller/KafkaConsumerController.java b/sermant-plugins/sermant-mq-consume-prohibition/consumer-controller/src/main/java/com/huaweicloud/sermant/kafka/controller/KafkaConsumerController.java index 5ee4e267cc..68201e4324 100644 --- a/sermant-plugins/sermant-mq-consume-prohibition/consumer-controller/src/main/java/com/huaweicloud/sermant/kafka/controller/KafkaConsumerController.java +++ b/sermant-plugins/sermant-mq-consume-prohibition/consumer-controller/src/main/java/com/huaweicloud/sermant/kafka/controller/KafkaConsumerController.java @@ -19,9 +19,14 @@ import com.huaweicloud.sermant.kafka.cache.KafkaConsumerCache; import com.huaweicloud.sermant.kafka.cache.KafkaConsumerWrapper; +import org.apache.commons.collections4.CollectionUtils; import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.TopicPartition; +import java.util.Collection; +import java.util.Map; import java.util.Set; +import java.util.stream.Collectors; /** * KafkaConsumer消费控制器 @@ -34,30 +39,36 @@ private KafkaConsumerController() { } /** - * 关闭消费 + * 执行禁止消费 * * @param kafkaConsumerWrapper 消费者包装实例 - * @param topics 消费主题 + * @param prohibitionTopics 消费主题 */ - public static void disableConsumption(KafkaConsumerWrapper kafkaConsumerWrapper, Set topics) { - } + public static void disableConsumption(KafkaConsumerWrapper kafkaConsumerWrapper, Set prohibitionTopics) { + Set originalTopics = kafkaConsumerWrapper.getOriginalTopics(); + Collection originalPartitions = kafkaConsumerWrapper.getOriginalPartitions(); - /** - * 开启消费 - * - * @param kafkaConsumerWrapper 消费者包装实例 - * @param topics 消费主题 - */ - public static void enableConsumption(KafkaConsumerWrapper kafkaConsumerWrapper, Set topics) { + // 未订阅任何Topic,无需操作 + if (originalTopics.size() == 0) { + return; + } + + KafkaConsumer kafkaConsumer = kafkaConsumerWrapper.getKafkaConsumer(); + Collection subtractTopics = CollectionUtils.subtract(originalTopics, prohibitionTopics); + if (kafkaConsumerWrapper.isAssign()) { + kafkaConsumer.assign(originalPartitions.stream().filter(obj -> subtractTopics.contains(obj.topic())) + .collect(Collectors.toSet())); + } + kafkaConsumer.subscribe(subtractTopics); } /** - * 更新消费者缓存 + * 新增消费者缓存 * * @param kafkaConsumer 消费者实例 */ - public static void updateConsumerCache(KafkaConsumer kafkaConsumer) { - KafkaConsumerCache.INSTANCE.updateCache(kafkaConsumer); + public static void addKafkaConsumerCache(KafkaConsumer kafkaConsumer) { + KafkaConsumerCache.INSTANCE.addKafkaConsumer(kafkaConsumer); } /** @@ -65,7 +76,16 @@ public static void updateConsumerCache(KafkaConsumer kafkaConsumer) { * * @return 消费者缓存 */ - public static Set getConsumerCache() { + public static Map getKafkaConsumerCache() { return KafkaConsumerCache.INSTANCE.getCache(); } + + /** + * 移除消费者缓存 + * + * @param kafkaConsumerWrapper 消费者包装实例 + */ + public static void removeKafkaConsumeCache(KafkaConsumerWrapper kafkaConsumerWrapper) { + KafkaConsumerCache.INSTANCE.getCache().remove(kafkaConsumerWrapper); + } } diff --git a/sermant-plugins/sermant-mq-consume-prohibition/consumer-controller/src/main/java/com/huaweicloud/sermant/kafka/extension/KafkaConsumerHandler.java b/sermant-plugins/sermant-mq-consume-prohibition/consumer-controller/src/main/java/com/huaweicloud/sermant/kafka/extension/KafkaConsumerHandler.java index e6bc3a518e..d9b913d459 100644 --- a/sermant-plugins/sermant-mq-consume-prohibition/consumer-controller/src/main/java/com/huaweicloud/sermant/kafka/extension/KafkaConsumerHandler.java +++ b/sermant-plugins/sermant-mq-consume-prohibition/consumer-controller/src/main/java/com/huaweicloud/sermant/kafka/extension/KafkaConsumerHandler.java @@ -19,7 +19,7 @@ import com.huaweicloud.sermant.core.plugin.agent.entity.ExecuteContext; /** - * KafkaConsumer处理器接口,供外部实现在创建KafkaConsumer时执行相应操作 + * KafkaConsumer拦截点处理器接口,供外部实现在KafkaConsumer的拦截点执行相应操作 * * @author lilai * @since 2023-12-05 @@ -38,4 +38,11 @@ public interface KafkaConsumerHandler { * @param context 拦截点上下文 */ void doAfter(ExecuteContext context); + + /** + * 拦截点异常时的处理 + * + * @param context 拦截点上下文 + */ + void doOnThrow(ExecuteContext context); } diff --git a/sermant-plugins/sermant-mq-consume-prohibition/kafka-1.x-plugin/pom.xml b/sermant-plugins/sermant-mq-consume-prohibition/kafka-1.x-2.x-plugin/pom.xml similarity index 95% rename from sermant-plugins/sermant-mq-consume-prohibition/kafka-1.x-plugin/pom.xml rename to sermant-plugins/sermant-mq-consume-prohibition/kafka-1.x-2.x-plugin/pom.xml index d5d39b878f..f10d0b793c 100644 --- a/sermant-plugins/sermant-mq-consume-prohibition/kafka-1.x-plugin/pom.xml +++ b/sermant-plugins/sermant-mq-consume-prohibition/kafka-1.x-2.x-plugin/pom.xml @@ -9,7 +9,7 @@ 4.0.0 - kafka-1.x-plugin + kafka-1.x-2.x-plugin 8 @@ -17,7 +17,7 @@ UTF-8 plugin false - 1.1.0 + 2.7.0 diff --git a/sermant-plugins/sermant-mq-consume-prohibition/kafka-1.x-2.x-plugin/src/main/java/com/huaweicloud/sermant/kafka/utils/InvokeUtils.java b/sermant-plugins/sermant-mq-consume-prohibition/kafka-1.x-2.x-plugin/src/main/java/com/huaweicloud/sermant/kafka/utils/InvokeUtils.java new file mode 100644 index 0000000000..24cea503ac --- /dev/null +++ b/sermant-plugins/sermant-mq-consume-prohibition/kafka-1.x-2.x-plugin/src/main/java/com/huaweicloud/sermant/kafka/utils/InvokeUtils.java @@ -0,0 +1,59 @@ +/* + * Copyright (C) 2023-2023 Huawei Technologies Co., Ltd. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.huaweicloud.sermant.kafka.utils; + +/** + * 是否是Sermant发起的调用判断类 + * + * @author lilai + * @since 2023-12-18 + */ +public class InvokeUtils { + private static final String KAFKA_CONSUMER_CLASS_NAME = "org.apache.kafka.clients.consumer.KafkaConsumer"; + + private static final String CONSUMER_CONTROLLER_CLASS_NAME = "com.huaweicloud.sermant.kafka.controller" + + ".KafkaConsumerController"; + + private InvokeUtils() { + + } + + /** + * 判断是否Sermant发起的调用 + * + * @return 是否Sermant发起的调用 + */ + public static boolean isInvokeBySermant() { + StackTraceElement[] stackTrace = Thread.currentThread().getStackTrace(); + int stackTraceIdxMax = stackTrace.length - 1; + for (int i = 0; i < stackTrace.length; i++) { + if (!KAFKA_CONSUMER_CLASS_NAME.equals(stackTrace[i].getClassName())) { + continue; + } + if (i == stackTraceIdxMax) { + break; + } + if (KAFKA_CONSUMER_CLASS_NAME.equals(stackTrace[i + 1].getClassName())) { + continue; + } + if (CONSUMER_CONTROLLER_CLASS_NAME.equals(stackTrace[i + 1].getClassName())) { + return true; + } + } + return false; + } +} diff --git a/sermant-plugins/sermant-mq-consume-prohibition/kafka-1.x-2.x-plugin/src/main/java/com/huaweicloud/sermant/kafka/utils/MarkUtils.java b/sermant-plugins/sermant-mq-consume-prohibition/kafka-1.x-2.x-plugin/src/main/java/com/huaweicloud/sermant/kafka/utils/MarkUtils.java new file mode 100644 index 0000000000..06137b1436 --- /dev/null +++ b/sermant-plugins/sermant-mq-consume-prohibition/kafka-1.x-2.x-plugin/src/main/java/com/huaweicloud/sermant/kafka/utils/MarkUtils.java @@ -0,0 +1,63 @@ +/* + * Copyright (C) 2023-2023 Huawei Technologies Co., Ltd. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/* + * Copyright (C) 2023-2023 Huawei Technologies Co., Ltd. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.huaweicloud.sermant.kafka.utils; + +/** + * 线程变量标记类,用于兼容kafka不同版本不重复进入构造函数 + * + * @author lilai + * @since 2023-12-09 + */ +public class MarkUtils { + private static final ThreadLocal MARK = new ThreadLocal<>(); + + private MarkUtils() { + } + + /** + * 获取线程变量 + * + * @return 线程变量 + */ + public static Boolean getMark() { + return MARK.get(); + } + + /** + * 存入线程变量 + * + * @param value 线程变量 + */ + public static void setMark(Boolean value) { + MARK.set(value); + } +} \ No newline at end of file diff --git a/sermant-plugins/sermant-mq-consume-prohibition/kafka-1.x-2.x-plugin/src/main/resources/META-INF/services/com.huaweicloud.sermant.core.plugin.agent.declarer.PluginDeclarer b/sermant-plugins/sermant-mq-consume-prohibition/kafka-1.x-2.x-plugin/src/main/resources/META-INF/services/com.huaweicloud.sermant.core.plugin.agent.declarer.PluginDeclarer new file mode 100644 index 0000000000..e0223e9d09 --- /dev/null +++ b/sermant-plugins/sermant-mq-consume-prohibition/kafka-1.x-2.x-plugin/src/main/resources/META-INF/services/com.huaweicloud.sermant.core.plugin.agent.declarer.PluginDeclarer @@ -0,0 +1,17 @@ +# +# Copyright (C) 2023-2023 Huawei Technologies Co., Ltd. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +com.huaweicloud.sermant.kafka.declarer.KafkaConsumerDeclarer \ No newline at end of file diff --git a/sermant-plugins/sermant-mq-consume-prohibition/kafka-1.x-plugin/src/main/resources/META-INF/services/com.huaweicloud.sermant.core.plugin.agent.declarer.PluginDeclarer b/sermant-plugins/sermant-mq-consume-prohibition/kafka-1.x-plugin/src/main/resources/META-INF/services/com.huaweicloud.sermant.core.plugin.agent.declarer.PluginDeclarer deleted file mode 100644 index a6ddfda59c..0000000000 --- a/sermant-plugins/sermant-mq-consume-prohibition/kafka-1.x-plugin/src/main/resources/META-INF/services/com.huaweicloud.sermant.core.plugin.agent.declarer.PluginDeclarer +++ /dev/null @@ -1 +0,0 @@ -com.huaweicloud.sermant.kafka.declarer.KafkaConsumerDeclarer \ No newline at end of file diff --git a/sermant-plugins/sermant-mq-consume-prohibition/pom.xml b/sermant-plugins/sermant-mq-consume-prohibition/pom.xml index cb7b8b9147..1cf5db2e8c 100644 --- a/sermant-plugins/sermant-mq-consume-prohibition/pom.xml +++ b/sermant-plugins/sermant-mq-consume-prohibition/pom.xml @@ -34,7 +34,7 @@ true - kafka-1.x-plugin + kafka-1.x-2.x-plugin consumer-controller config-service @@ -42,7 +42,7 @@ test - kafka-1.x-plugin + kafka-1.x-2.x-plugin consumer-controller config-service @@ -50,7 +50,7 @@ release - kafka-1.x-plugin + kafka-1.x-2.x-plugin consumer-controller config-service