From c3332fb926e0cf0e250145b3d769a791913f0d20 Mon Sep 17 00:00:00 2001 From: chengyouling Date: Sat, 31 Aug 2024 18:59:09 +0800 Subject: [PATCH] add mq gray plugin declare/service Signed-off-by: chengyouling --- .../mq-config-common/pom.xml | 36 +++ .../mq/grayscale/config/BaseMessage.java | 0 .../mq/grayscale/config/ConsumeModeEnum.java | 0 .../mq/grayscale/config/GrayTagItem.java | 0 .../grayscale/config/MqGrayscaleConfig.java | 10 +- ...io.sermant.core.plugin.config.PluginConfig | 0 .../mq-config-service/pom.xml | 47 ++++ .../grayscale/listener/MqGrayConfigCache.java | 57 ++++ .../listener}/MqGrayConfigHandler.java | 8 +- .../listener}/MqGrayConfigListener.java | 2 +- .../service/MqGrayDynamicConfigService.java | 46 +++ ....sermant.core.plugin.service.PluginService | 17 ++ .../pom.xml | 17 +- .../config/RocketMqConsumerClientConfig.java} | 6 +- ...MqLitePullConsumerConstructorDeclarer.java | 53 ++++ ...etMqLitePullConsumerSubscribeDeclarer.java | 51 ++++ ...cketMqProducerGrayMessageHookDeclarer.java | 52 ++++ ...cketMqPullConsumerConstructorDeclarer.java | 53 ++++ .../RocketMqPullConsumerFetchDeclarer.java | 50 ++++ ...ullConsumerSubscriptionUpdateDeclarer.java | 48 ++++ ...cketMqPushConsumerConstructorDeclarer.java | 70 +++++ ...tMqPushConsumerSubscribeFetchDeclarer.java | 50 ++++ ...qSchedulerRebuildSubscriptionDeclarer.java | 48 ++++ .../RocketMqAbstractIntercepter.java | 71 +++++ ...ocketMqConsumerConstructorInterceptor.java | 58 ++++ ...qLitePullConsumerSubscribeInterceptor.java | 40 +++ ...tMqProducerGrayMessageHookInterceptor.java | 46 +++ .../RocketMqPullConsumerFetchInterceptor.java | 40 +++ ...ConsumerSubscriptionUpdateInterceptor.java | 75 +++++ ...PushConsumerSubscribeFetchInterceptor.java | 40 +++ ...hedulerRebuildSubscriptionInterceptor.java | 78 ++++++ .../RocketMqConsumerGroupAutoCheck.java | 264 ++++++++++++++++++ .../service/RocketMqGraySendMessageHook.java | 48 ++++ .../utils/RocketMqGrayscaleConfigUtils.java} | 70 ++--- .../utils/RocketMqSubscriptionDataUtils.java} | 97 ++++--- ....core.plugin.agent.declarer.PluginDeclarer | 25 ++ sermant-plugins/sermant-mq-grayscale/pom.xml | 12 +- 37 files changed, 1590 insertions(+), 95 deletions(-) create mode 100644 sermant-plugins/sermant-mq-grayscale/mq-config-common/pom.xml rename sermant-plugins/sermant-mq-grayscale/{sermant-mq-grayscale-plugin => mq-config-common}/src/main/java/io/sermant/mq/grayscale/config/BaseMessage.java (100%) rename sermant-plugins/sermant-mq-grayscale/{sermant-mq-grayscale-plugin => mq-config-common}/src/main/java/io/sermant/mq/grayscale/config/ConsumeModeEnum.java (100%) rename sermant-plugins/sermant-mq-grayscale/{sermant-mq-grayscale-plugin => mq-config-common}/src/main/java/io/sermant/mq/grayscale/config/GrayTagItem.java (100%) rename sermant-plugins/sermant-mq-grayscale/{sermant-mq-grayscale-plugin => mq-config-common}/src/main/java/io/sermant/mq/grayscale/config/MqGrayscaleConfig.java (96%) rename sermant-plugins/sermant-mq-grayscale/{sermant-mq-grayscale-plugin => mq-config-common}/src/main/resources/META-INF/services/io.sermant.core.plugin.config.PluginConfig (100%) create mode 100644 sermant-plugins/sermant-mq-grayscale/mq-config-service/pom.xml create mode 100644 sermant-plugins/sermant-mq-grayscale/mq-config-service/src/main/java/io/sermant/mq/grayscale/listener/MqGrayConfigCache.java rename sermant-plugins/sermant-mq-grayscale/{sermant-mq-grayscale-plugin/src/main/java/io/sermant/mq/grayscale/config => mq-config-service/src/main/java/io/sermant/mq/grayscale/listener}/MqGrayConfigHandler.java (90%) rename sermant-plugins/sermant-mq-grayscale/{sermant-mq-grayscale-plugin/src/main/java/io/sermant/mq/grayscale/config => mq-config-service/src/main/java/io/sermant/mq/grayscale/listener}/MqGrayConfigListener.java (96%) create mode 100644 sermant-plugins/sermant-mq-grayscale/mq-config-service/src/main/java/io/sermant/mq/grayscale/service/MqGrayDynamicConfigService.java create mode 100644 sermant-plugins/sermant-mq-grayscale/mq-config-service/src/main/resources/META-INF/services/io.sermant.core.plugin.service.PluginService rename sermant-plugins/sermant-mq-grayscale/{sermant-mq-grayscale-plugin => mq-grayscale-rocketmq-plugin}/pom.xml (84%) rename sermant-plugins/sermant-mq-grayscale/{sermant-mq-grayscale-plugin/src/main/java/io/sermant/mq/grayscale/config/MqConsumerClientConfig.java => mq-grayscale-rocketmq-plugin/src/main/java/io/sermant/mq/grayscale/rocketmq/config/RocketMqConsumerClientConfig.java} (90%) create mode 100644 sermant-plugins/sermant-mq-grayscale/mq-grayscale-rocketmq-plugin/src/main/java/io/sermant/mq/grayscale/rocketmq/declarer/RocketMqLitePullConsumerConstructorDeclarer.java create mode 100644 sermant-plugins/sermant-mq-grayscale/mq-grayscale-rocketmq-plugin/src/main/java/io/sermant/mq/grayscale/rocketmq/declarer/RocketMqLitePullConsumerSubscribeDeclarer.java create mode 100644 sermant-plugins/sermant-mq-grayscale/mq-grayscale-rocketmq-plugin/src/main/java/io/sermant/mq/grayscale/rocketmq/declarer/RocketMqProducerGrayMessageHookDeclarer.java create mode 100644 sermant-plugins/sermant-mq-grayscale/mq-grayscale-rocketmq-plugin/src/main/java/io/sermant/mq/grayscale/rocketmq/declarer/RocketMqPullConsumerConstructorDeclarer.java create mode 100644 sermant-plugins/sermant-mq-grayscale/mq-grayscale-rocketmq-plugin/src/main/java/io/sermant/mq/grayscale/rocketmq/declarer/RocketMqPullConsumerFetchDeclarer.java create mode 100644 sermant-plugins/sermant-mq-grayscale/mq-grayscale-rocketmq-plugin/src/main/java/io/sermant/mq/grayscale/rocketmq/declarer/RocketMqPullConsumerSubscriptionUpdateDeclarer.java create mode 100644 sermant-plugins/sermant-mq-grayscale/mq-grayscale-rocketmq-plugin/src/main/java/io/sermant/mq/grayscale/rocketmq/declarer/RocketMqPushConsumerConstructorDeclarer.java create mode 100644 sermant-plugins/sermant-mq-grayscale/mq-grayscale-rocketmq-plugin/src/main/java/io/sermant/mq/grayscale/rocketmq/declarer/RocketMqPushConsumerSubscribeFetchDeclarer.java create mode 100644 sermant-plugins/sermant-mq-grayscale/mq-grayscale-rocketmq-plugin/src/main/java/io/sermant/mq/grayscale/rocketmq/declarer/RocketMqSchedulerRebuildSubscriptionDeclarer.java create mode 100644 sermant-plugins/sermant-mq-grayscale/mq-grayscale-rocketmq-plugin/src/main/java/io/sermant/mq/grayscale/rocketmq/interceptor/RocketMqAbstractIntercepter.java create mode 100644 sermant-plugins/sermant-mq-grayscale/mq-grayscale-rocketmq-plugin/src/main/java/io/sermant/mq/grayscale/rocketmq/interceptor/RocketMqConsumerConstructorInterceptor.java create mode 100644 sermant-plugins/sermant-mq-grayscale/mq-grayscale-rocketmq-plugin/src/main/java/io/sermant/mq/grayscale/rocketmq/interceptor/RocketMqLitePullConsumerSubscribeInterceptor.java create mode 100644 sermant-plugins/sermant-mq-grayscale/mq-grayscale-rocketmq-plugin/src/main/java/io/sermant/mq/grayscale/rocketmq/interceptor/RocketMqProducerGrayMessageHookInterceptor.java create mode 100644 sermant-plugins/sermant-mq-grayscale/mq-grayscale-rocketmq-plugin/src/main/java/io/sermant/mq/grayscale/rocketmq/interceptor/RocketMqPullConsumerFetchInterceptor.java create mode 100644 sermant-plugins/sermant-mq-grayscale/mq-grayscale-rocketmq-plugin/src/main/java/io/sermant/mq/grayscale/rocketmq/interceptor/RocketMqPullConsumerSubscriptionUpdateInterceptor.java create mode 100644 sermant-plugins/sermant-mq-grayscale/mq-grayscale-rocketmq-plugin/src/main/java/io/sermant/mq/grayscale/rocketmq/interceptor/RocketMqPushConsumerSubscribeFetchInterceptor.java create mode 100644 sermant-plugins/sermant-mq-grayscale/mq-grayscale-rocketmq-plugin/src/main/java/io/sermant/mq/grayscale/rocketmq/interceptor/RocketMqSchedulerRebuildSubscriptionInterceptor.java create mode 100644 sermant-plugins/sermant-mq-grayscale/mq-grayscale-rocketmq-plugin/src/main/java/io/sermant/mq/grayscale/rocketmq/service/RocketMqConsumerGroupAutoCheck.java create mode 100644 sermant-plugins/sermant-mq-grayscale/mq-grayscale-rocketmq-plugin/src/main/java/io/sermant/mq/grayscale/rocketmq/service/RocketMqGraySendMessageHook.java rename sermant-plugins/sermant-mq-grayscale/{sermant-mq-grayscale-plugin/src/main/java/io/sermant/mq/grayscale/utils/MqGrayscaleConfigUtils.java => mq-grayscale-rocketmq-plugin/src/main/java/io/sermant/mq/grayscale/rocketmq/utils/RocketMqGrayscaleConfigUtils.java} (85%) rename sermant-plugins/sermant-mq-grayscale/{sermant-mq-grayscale-plugin/src/main/java/io/sermant/mq/grayscale/utils/SubscriptionDataUtils.java => mq-grayscale-rocketmq-plugin/src/main/java/io/sermant/mq/grayscale/rocketmq/utils/RocketMqSubscriptionDataUtils.java} (79%) create mode 100644 sermant-plugins/sermant-mq-grayscale/mq-grayscale-rocketmq-plugin/src/main/resources/META-INF/services/io.sermant.core.plugin.agent.declarer.PluginDeclarer diff --git a/sermant-plugins/sermant-mq-grayscale/mq-config-common/pom.xml b/sermant-plugins/sermant-mq-grayscale/mq-config-common/pom.xml new file mode 100644 index 0000000000..93be013be1 --- /dev/null +++ b/sermant-plugins/sermant-mq-grayscale/mq-config-common/pom.xml @@ -0,0 +1,36 @@ + + + 4.0.0 + + sermant-mq-grayscale + io.sermant + 1.0.0 + + + mq-config-common + + + 8 + 8 + + + + + io.sermant + sermant-agentcore-core + provided + + + junit + junit + test + + + org.mockito + mockito-core + test + + + \ No newline at end of file diff --git a/sermant-plugins/sermant-mq-grayscale/sermant-mq-grayscale-plugin/src/main/java/io/sermant/mq/grayscale/config/BaseMessage.java b/sermant-plugins/sermant-mq-grayscale/mq-config-common/src/main/java/io/sermant/mq/grayscale/config/BaseMessage.java similarity index 100% rename from sermant-plugins/sermant-mq-grayscale/sermant-mq-grayscale-plugin/src/main/java/io/sermant/mq/grayscale/config/BaseMessage.java rename to sermant-plugins/sermant-mq-grayscale/mq-config-common/src/main/java/io/sermant/mq/grayscale/config/BaseMessage.java diff --git a/sermant-plugins/sermant-mq-grayscale/sermant-mq-grayscale-plugin/src/main/java/io/sermant/mq/grayscale/config/ConsumeModeEnum.java b/sermant-plugins/sermant-mq-grayscale/mq-config-common/src/main/java/io/sermant/mq/grayscale/config/ConsumeModeEnum.java similarity index 100% rename from sermant-plugins/sermant-mq-grayscale/sermant-mq-grayscale-plugin/src/main/java/io/sermant/mq/grayscale/config/ConsumeModeEnum.java rename to sermant-plugins/sermant-mq-grayscale/mq-config-common/src/main/java/io/sermant/mq/grayscale/config/ConsumeModeEnum.java diff --git a/sermant-plugins/sermant-mq-grayscale/sermant-mq-grayscale-plugin/src/main/java/io/sermant/mq/grayscale/config/GrayTagItem.java b/sermant-plugins/sermant-mq-grayscale/mq-config-common/src/main/java/io/sermant/mq/grayscale/config/GrayTagItem.java similarity index 100% rename from sermant-plugins/sermant-mq-grayscale/sermant-mq-grayscale-plugin/src/main/java/io/sermant/mq/grayscale/config/GrayTagItem.java rename to sermant-plugins/sermant-mq-grayscale/mq-config-common/src/main/java/io/sermant/mq/grayscale/config/GrayTagItem.java diff --git a/sermant-plugins/sermant-mq-grayscale/sermant-mq-grayscale-plugin/src/main/java/io/sermant/mq/grayscale/config/MqGrayscaleConfig.java b/sermant-plugins/sermant-mq-grayscale/mq-config-common/src/main/java/io/sermant/mq/grayscale/config/MqGrayscaleConfig.java similarity index 96% rename from sermant-plugins/sermant-mq-grayscale/sermant-mq-grayscale-plugin/src/main/java/io/sermant/mq/grayscale/config/MqGrayscaleConfig.java rename to sermant-plugins/sermant-mq-grayscale/mq-config-common/src/main/java/io/sermant/mq/grayscale/config/MqGrayscaleConfig.java index 0eb33642ed..f2bd3dba00 100644 --- a/sermant-plugins/sermant-mq-grayscale/sermant-mq-grayscale-plugin/src/main/java/io/sermant/mq/grayscale/config/MqGrayscaleConfig.java +++ b/sermant-plugins/sermant-mq-grayscale/mq-config-common/src/main/java/io/sermant/mq/grayscale/config/MqGrayscaleConfig.java @@ -18,7 +18,6 @@ import io.sermant.core.config.common.ConfigTypeKey; import io.sermant.core.plugin.config.PluginConfig; -import io.sermant.mq.grayscale.utils.SubscriptionDataUtils; import java.util.ArrayList; import java.util.HashMap; @@ -35,6 +34,11 @@ **/ @ConfigTypeKey("grayscale.mq.config") public class MqGrayscaleConfig implements PluginConfig { + /** + * afa symbol + */ + private static final String AFA_SYMBOL = "@"; + private boolean enabled = false; private List grayscale = new ArrayList<>(); @@ -122,12 +126,12 @@ public String buildAllTrafficTagInfoToStr() { StringBuilder sb = new StringBuilder(); for (GrayTagItem item : grayscale) { if (sb.length() > 0) { - sb.append(SubscriptionDataUtils.AFA_SYMBOL); + sb.append(AFA_SYMBOL); } sb.append(item.getConsumerGroupTag()); for (Map.Entry entry : item.getTrafficTag().entrySet()) { sb.append(entry.getKey()) - .append(SubscriptionDataUtils.AFA_SYMBOL) + .append(AFA_SYMBOL) .append(entry.getValue()); } } diff --git a/sermant-plugins/sermant-mq-grayscale/sermant-mq-grayscale-plugin/src/main/resources/META-INF/services/io.sermant.core.plugin.config.PluginConfig b/sermant-plugins/sermant-mq-grayscale/mq-config-common/src/main/resources/META-INF/services/io.sermant.core.plugin.config.PluginConfig similarity index 100% rename from sermant-plugins/sermant-mq-grayscale/sermant-mq-grayscale-plugin/src/main/resources/META-INF/services/io.sermant.core.plugin.config.PluginConfig rename to sermant-plugins/sermant-mq-grayscale/mq-config-common/src/main/resources/META-INF/services/io.sermant.core.plugin.config.PluginConfig diff --git a/sermant-plugins/sermant-mq-grayscale/mq-config-service/pom.xml b/sermant-plugins/sermant-mq-grayscale/mq-config-service/pom.xml new file mode 100644 index 0000000000..716fee9eef --- /dev/null +++ b/sermant-plugins/sermant-mq-grayscale/mq-config-service/pom.xml @@ -0,0 +1,47 @@ + + + 4.0.0 + + sermant-mq-grayscale + io.sermant + 1.0.0 + + + mq-config-service + + + 8 + 8 + service + + + + + org.yaml + snakeyaml + + + io.sermant + sermant-agentcore-core + provided + + + io.sermant + mq-config-common + ${project.version} + provided + + + junit + junit + test + + + org.mockito + mockito-core + test + + + \ No newline at end of file diff --git a/sermant-plugins/sermant-mq-grayscale/mq-config-service/src/main/java/io/sermant/mq/grayscale/listener/MqGrayConfigCache.java b/sermant-plugins/sermant-mq-grayscale/mq-config-service/src/main/java/io/sermant/mq/grayscale/listener/MqGrayConfigCache.java new file mode 100644 index 0000000000..bfb5d7da49 --- /dev/null +++ b/sermant-plugins/sermant-mq-grayscale/mq-config-service/src/main/java/io/sermant/mq/grayscale/listener/MqGrayConfigCache.java @@ -0,0 +1,57 @@ +/* + * Copyright (C) 2024-2024 Sermant Authors. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.sermant.mq.grayscale.listener; + +import io.sermant.mq.grayscale.config.MqGrayscaleConfig; + +/** + * grayscale config cache + * + * @author chengyouling + * @since 2024-09-12 + **/ +public class MqGrayConfigCache { + private static MqGrayscaleConfig cacheConfig; + + private MqGrayConfigCache() { + } + + /** + * get cache mqGrayscaleConfig + * + * @return mqGrayscaleConfig + */ + public static MqGrayscaleConfig getCacheConfig() { + return cacheConfig; + } + + /** + * set cache mqGrayscaleConfig + * + * @param config mqGrayscaleConfig + */ + public static void setCacheConfig(MqGrayscaleConfig config) { + cacheConfig = config; + } + + /** + * clear cache mqGrayscaleConfig + */ + public static void clearCacheConfig() { + cacheConfig = new MqGrayscaleConfig(); + } +} diff --git a/sermant-plugins/sermant-mq-grayscale/sermant-mq-grayscale-plugin/src/main/java/io/sermant/mq/grayscale/config/MqGrayConfigHandler.java b/sermant-plugins/sermant-mq-grayscale/mq-config-service/src/main/java/io/sermant/mq/grayscale/listener/MqGrayConfigHandler.java similarity index 90% rename from sermant-plugins/sermant-mq-grayscale/sermant-mq-grayscale-plugin/src/main/java/io/sermant/mq/grayscale/config/MqGrayConfigHandler.java rename to sermant-plugins/sermant-mq-grayscale/mq-config-service/src/main/java/io/sermant/mq/grayscale/listener/MqGrayConfigHandler.java index 413ec90b59..1e043c0b00 100644 --- a/sermant-plugins/sermant-mq-grayscale/sermant-mq-grayscale-plugin/src/main/java/io/sermant/mq/grayscale/config/MqGrayConfigHandler.java +++ b/sermant-plugins/sermant-mq-grayscale/mq-config-service/src/main/java/io/sermant/mq/grayscale/listener/MqGrayConfigHandler.java @@ -14,13 +14,13 @@ * limitations under the License. */ -package io.sermant.mq.grayscale.config; +package io.sermant.mq.grayscale.listener; import io.sermant.core.common.LoggerFactory; import io.sermant.core.service.dynamicconfig.common.DynamicConfigEvent; import io.sermant.core.service.dynamicconfig.common.DynamicConfigEventType; import io.sermant.core.utils.StringUtils; -import io.sermant.mq.grayscale.utils.MqGrayscaleConfigUtils; +import io.sermant.mq.grayscale.config.MqGrayscaleConfig; import org.yaml.snakeyaml.DumperOptions; import org.yaml.snakeyaml.Yaml; @@ -64,7 +64,7 @@ public void handle(DynamicConfigEvent event) { return; } if (event.getEventType() == DynamicConfigEventType.DELETE) { - MqGrayscaleConfigUtils.resetGrayscaleConfig(); + MqGrayConfigCache.clearCacheConfig(); return; } if (!StringUtils.isEmpty(event.getContent())) { @@ -72,7 +72,7 @@ public void handle(DynamicConfigEvent event) { event.getGroup(), event.getContent())); MqGrayscaleConfig config = yaml.loadAs(event.getContent(), MqGrayscaleConfig.class); if (config != null) { - MqGrayscaleConfigUtils.setGrayscaleConfig(config, event.getEventType()); + MqGrayConfigCache.setCacheConfig(config); } } } diff --git a/sermant-plugins/sermant-mq-grayscale/sermant-mq-grayscale-plugin/src/main/java/io/sermant/mq/grayscale/config/MqGrayConfigListener.java b/sermant-plugins/sermant-mq-grayscale/mq-config-service/src/main/java/io/sermant/mq/grayscale/listener/MqGrayConfigListener.java similarity index 96% rename from sermant-plugins/sermant-mq-grayscale/sermant-mq-grayscale-plugin/src/main/java/io/sermant/mq/grayscale/config/MqGrayConfigListener.java rename to sermant-plugins/sermant-mq-grayscale/mq-config-service/src/main/java/io/sermant/mq/grayscale/listener/MqGrayConfigListener.java index 3ec70e1b11..0300ddf695 100644 --- a/sermant-plugins/sermant-mq-grayscale/sermant-mq-grayscale-plugin/src/main/java/io/sermant/mq/grayscale/config/MqGrayConfigListener.java +++ b/sermant-plugins/sermant-mq-grayscale/mq-config-service/src/main/java/io/sermant/mq/grayscale/listener/MqGrayConfigListener.java @@ -14,7 +14,7 @@ * limitations under the License. */ -package io.sermant.mq.grayscale.config; +package io.sermant.mq.grayscale.listener; import io.sermant.core.service.dynamicconfig.common.DynamicConfigEvent; import io.sermant.core.service.dynamicconfig.common.DynamicConfigListener; diff --git a/sermant-plugins/sermant-mq-grayscale/mq-config-service/src/main/java/io/sermant/mq/grayscale/service/MqGrayDynamicConfigService.java b/sermant-plugins/sermant-mq-grayscale/mq-config-service/src/main/java/io/sermant/mq/grayscale/service/MqGrayDynamicConfigService.java new file mode 100644 index 0000000000..ed8205777e --- /dev/null +++ b/sermant-plugins/sermant-mq-grayscale/mq-config-service/src/main/java/io/sermant/mq/grayscale/service/MqGrayDynamicConfigService.java @@ -0,0 +1,46 @@ +/* + * Copyright (C) 2024-2024 Sermant Authors. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.sermant.mq.grayscale.service; + +import io.sermant.core.common.LoggerFactory; +import io.sermant.core.config.ConfigManager; +import io.sermant.core.plugin.config.ServiceMeta; +import io.sermant.core.plugin.service.PluginService; +import io.sermant.core.plugin.subscribe.CommonGroupConfigSubscriber; +import io.sermant.core.plugin.subscribe.ConfigSubscriber; +import io.sermant.mq.grayscale.listener.MqGrayConfigListener; + +import java.util.logging.Logger; + +/** + * grayscale dynamic config service + * + * @author chengyouling + * @since 2024-05-27 + **/ +public class MqGrayDynamicConfigService implements PluginService { + private static final Logger LOGGER = LoggerFactory.getLogger(); + + @Override + public void start() { + ConfigSubscriber subscriber = new CommonGroupConfigSubscriber( + ConfigManager.getConfig(ServiceMeta.class).getService(), new MqGrayConfigListener(), + "mq-grayscale"); + subscriber.subscribe(); + LOGGER.info("Success to subscribe mq-grayscale config"); + } +} diff --git a/sermant-plugins/sermant-mq-grayscale/mq-config-service/src/main/resources/META-INF/services/io.sermant.core.plugin.service.PluginService b/sermant-plugins/sermant-mq-grayscale/mq-config-service/src/main/resources/META-INF/services/io.sermant.core.plugin.service.PluginService new file mode 100644 index 0000000000..ef5284eb25 --- /dev/null +++ b/sermant-plugins/sermant-mq-grayscale/mq-config-service/src/main/resources/META-INF/services/io.sermant.core.plugin.service.PluginService @@ -0,0 +1,17 @@ +# +# Copyright (C) 2024-2024 Sermant Authors. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +io.sermant.mq.grayscale.service.MqGrayDynamicConfigService \ No newline at end of file diff --git a/sermant-plugins/sermant-mq-grayscale/sermant-mq-grayscale-plugin/pom.xml b/sermant-plugins/sermant-mq-grayscale/mq-grayscale-rocketmq-plugin/pom.xml similarity index 84% rename from sermant-plugins/sermant-mq-grayscale/sermant-mq-grayscale-plugin/pom.xml rename to sermant-plugins/sermant-mq-grayscale/mq-grayscale-rocketmq-plugin/pom.xml index 8bf04ab08b..213f302f8f 100644 --- a/sermant-plugins/sermant-mq-grayscale/sermant-mq-grayscale-plugin/pom.xml +++ b/sermant-plugins/sermant-mq-grayscale/mq-grayscale-rocketmq-plugin/pom.xml @@ -9,7 +9,7 @@ 1.0.0 - sermant-mq-grayscale-plugin + mq-grayscale-rocketmq-plugin 8 @@ -20,12 +20,14 @@ - com.alibaba - fastjson + io.sermant + mq-config-common + ${project.version} - org.yaml - snakeyaml + io.sermant + mq-config-service + ${project.version} io.sermant @@ -52,6 +54,11 @@ mockito-inline test + + com.alibaba + fastjson + test + diff --git a/sermant-plugins/sermant-mq-grayscale/sermant-mq-grayscale-plugin/src/main/java/io/sermant/mq/grayscale/config/MqConsumerClientConfig.java b/sermant-plugins/sermant-mq-grayscale/mq-grayscale-rocketmq-plugin/src/main/java/io/sermant/mq/grayscale/rocketmq/config/RocketMqConsumerClientConfig.java similarity index 90% rename from sermant-plugins/sermant-mq-grayscale/sermant-mq-grayscale-plugin/src/main/java/io/sermant/mq/grayscale/config/MqConsumerClientConfig.java rename to sermant-plugins/sermant-mq-grayscale/mq-grayscale-rocketmq-plugin/src/main/java/io/sermant/mq/grayscale/rocketmq/config/RocketMqConsumerClientConfig.java index 4079594203..2a39d75ec5 100644 --- a/sermant-plugins/sermant-mq-grayscale/sermant-mq-grayscale-plugin/src/main/java/io/sermant/mq/grayscale/config/MqConsumerClientConfig.java +++ b/sermant-plugins/sermant-mq-grayscale/mq-grayscale-rocketmq-plugin/src/main/java/io/sermant/mq/grayscale/rocketmq/config/RocketMqConsumerClientConfig.java @@ -14,7 +14,7 @@ * limitations under the License. */ -package io.sermant.mq.grayscale.config; +package io.sermant.mq.grayscale.rocketmq.config; import org.apache.rocketmq.client.impl.factory.MQClientInstance; @@ -24,7 +24,7 @@ * @author chengyouling * @since 2024-05-27 **/ -public class MqConsumerClientConfig { +public class RocketMqConsumerClientConfig { private String topic; private String address; @@ -40,7 +40,7 @@ public class MqConsumerClientConfig { * @param topic topic * @param consumerGroup consumerGroup */ - public MqConsumerClientConfig(String address, String topic, String consumerGroup) { + public RocketMqConsumerClientConfig(String address, String topic, String consumerGroup) { this.address = address; this.topic = topic; this.consumerGroup = consumerGroup; diff --git a/sermant-plugins/sermant-mq-grayscale/mq-grayscale-rocketmq-plugin/src/main/java/io/sermant/mq/grayscale/rocketmq/declarer/RocketMqLitePullConsumerConstructorDeclarer.java b/sermant-plugins/sermant-mq-grayscale/mq-grayscale-rocketmq-plugin/src/main/java/io/sermant/mq/grayscale/rocketmq/declarer/RocketMqLitePullConsumerConstructorDeclarer.java new file mode 100644 index 0000000000..1d3c8d6419 --- /dev/null +++ b/sermant-plugins/sermant-mq-grayscale/mq-grayscale-rocketmq-plugin/src/main/java/io/sermant/mq/grayscale/rocketmq/declarer/RocketMqLitePullConsumerConstructorDeclarer.java @@ -0,0 +1,53 @@ +/* + * Copyright (C) 2024-2024 Sermant Authors. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.sermant.mq.grayscale.rocketmq.declarer; + +import io.sermant.core.plugin.agent.declarer.AbstractPluginDeclarer; +import io.sermant.core.plugin.agent.declarer.InterceptDeclarer; +import io.sermant.core.plugin.agent.matcher.ClassMatcher; +import io.sermant.core.plugin.agent.matcher.MethodMatcher; +import io.sermant.mq.grayscale.rocketmq.interceptor.RocketMqConsumerConstructorInterceptor; + +/** + * lite pull consumer set gray consumer group declarer + * + * @author chengyouling + * @since 2024-09-07 + **/ +public class RocketMqLitePullConsumerConstructorDeclarer extends AbstractPluginDeclarer { + private static final String ENHANCE_CLASS = "org.apache.rocketmq.client.consumer.DefaultLitePullConsumer"; + + private static final String[] METHOD_PARAM_TYPES = { + "java.lang.String", + "java.lang.String", + "org.apache.rocketmq.remoting.RPCHook" + }; + + @Override + public ClassMatcher getClassMatcher() { + return ClassMatcher.nameEquals(ENHANCE_CLASS); + } + + @Override + public InterceptDeclarer[] getInterceptDeclarers(ClassLoader classLoader) { + return new InterceptDeclarer[]{ + InterceptDeclarer.build(MethodMatcher.isConstructor() + .and(MethodMatcher.paramTypesEqual(METHOD_PARAM_TYPES)), + new RocketMqConsumerConstructorInterceptor()) + }; + } +} diff --git a/sermant-plugins/sermant-mq-grayscale/mq-grayscale-rocketmq-plugin/src/main/java/io/sermant/mq/grayscale/rocketmq/declarer/RocketMqLitePullConsumerSubscribeDeclarer.java b/sermant-plugins/sermant-mq-grayscale/mq-grayscale-rocketmq-plugin/src/main/java/io/sermant/mq/grayscale/rocketmq/declarer/RocketMqLitePullConsumerSubscribeDeclarer.java new file mode 100644 index 0000000000..60090c820f --- /dev/null +++ b/sermant-plugins/sermant-mq-grayscale/mq-grayscale-rocketmq-plugin/src/main/java/io/sermant/mq/grayscale/rocketmq/declarer/RocketMqLitePullConsumerSubscribeDeclarer.java @@ -0,0 +1,51 @@ +/* + * Copyright (C) 2024-2024 Sermant Authors. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.sermant.mq.grayscale.rocketmq.declarer; + +import io.sermant.core.plugin.agent.declarer.AbstractPluginDeclarer; +import io.sermant.core.plugin.agent.declarer.InterceptDeclarer; +import io.sermant.core.plugin.agent.matcher.ClassMatcher; +import io.sermant.core.plugin.agent.matcher.MethodMatcher; +import io.sermant.mq.grayscale.rocketmq.interceptor.RocketMqLitePullConsumerSubscribeInterceptor; + +/** + * lite pull consumer build consumer client config declarer + * + * @author chengyouling + * @since 2024-05-27 + **/ +public class RocketMqLitePullConsumerSubscribeDeclarer extends AbstractPluginDeclarer { + private static final String ENHANCE_CLASS = "org.apache.rocketmq.client.impl.consumer.DefaultLitePullConsumerImpl"; + + private static final String METHOD_SUBSCRIBE = "subscribe"; + + private static final String METHOD_FETCH = "fetchMessageQueues"; + + @Override + public ClassMatcher getClassMatcher() { + return ClassMatcher.nameEquals(ENHANCE_CLASS); + } + + @Override + public InterceptDeclarer[] getInterceptDeclarers(ClassLoader classLoader) { + return new InterceptDeclarer[]{ + InterceptDeclarer.build( + MethodMatcher.nameContains(METHOD_SUBSCRIBE, METHOD_FETCH), + new RocketMqLitePullConsumerSubscribeInterceptor()) + }; + } +} diff --git a/sermant-plugins/sermant-mq-grayscale/mq-grayscale-rocketmq-plugin/src/main/java/io/sermant/mq/grayscale/rocketmq/declarer/RocketMqProducerGrayMessageHookDeclarer.java b/sermant-plugins/sermant-mq-grayscale/mq-grayscale-rocketmq-plugin/src/main/java/io/sermant/mq/grayscale/rocketmq/declarer/RocketMqProducerGrayMessageHookDeclarer.java new file mode 100644 index 0000000000..166411b68f --- /dev/null +++ b/sermant-plugins/sermant-mq-grayscale/mq-grayscale-rocketmq-plugin/src/main/java/io/sermant/mq/grayscale/rocketmq/declarer/RocketMqProducerGrayMessageHookDeclarer.java @@ -0,0 +1,52 @@ +/* + * Copyright (C) 2024-2024 Sermant Authors. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.sermant.mq.grayscale.rocketmq.declarer; + +import io.sermant.core.plugin.agent.declarer.AbstractPluginDeclarer; +import io.sermant.core.plugin.agent.declarer.InterceptDeclarer; +import io.sermant.core.plugin.agent.matcher.ClassMatcher; +import io.sermant.core.plugin.agent.matcher.MethodMatcher; +import io.sermant.mq.grayscale.rocketmq.interceptor.RocketMqProducerGrayMessageHookInterceptor; + +/** + * SendMessageHook builder declarer + * + * @author chengyouling + * @since 2024-05-27 + **/ +public class RocketMqProducerGrayMessageHookDeclarer extends AbstractPluginDeclarer { + private static final String ENHANCE_CLASS = "org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl"; + + private static final String[] METHOD_PARAM_TYPES = { + "org.apache.rocketmq.client.producer.DefaultMQProducer", + "org.apache.rocketmq.remoting.RPCHook" + }; + + @Override + public ClassMatcher getClassMatcher() { + return ClassMatcher.nameEquals(ENHANCE_CLASS); + } + + @Override + public InterceptDeclarer[] getInterceptDeclarers(ClassLoader classLoader) { + return new InterceptDeclarer[]{ + InterceptDeclarer.build(MethodMatcher.isConstructor() + .and(MethodMatcher.paramTypesEqual(METHOD_PARAM_TYPES)), + new RocketMqProducerGrayMessageHookInterceptor()) + }; + } +} diff --git a/sermant-plugins/sermant-mq-grayscale/mq-grayscale-rocketmq-plugin/src/main/java/io/sermant/mq/grayscale/rocketmq/declarer/RocketMqPullConsumerConstructorDeclarer.java b/sermant-plugins/sermant-mq-grayscale/mq-grayscale-rocketmq-plugin/src/main/java/io/sermant/mq/grayscale/rocketmq/declarer/RocketMqPullConsumerConstructorDeclarer.java new file mode 100644 index 0000000000..9d185c9d15 --- /dev/null +++ b/sermant-plugins/sermant-mq-grayscale/mq-grayscale-rocketmq-plugin/src/main/java/io/sermant/mq/grayscale/rocketmq/declarer/RocketMqPullConsumerConstructorDeclarer.java @@ -0,0 +1,53 @@ +/* + * Copyright (C) 2024-2024 Sermant Authors. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.sermant.mq.grayscale.rocketmq.declarer; + +import io.sermant.core.plugin.agent.declarer.AbstractPluginDeclarer; +import io.sermant.core.plugin.agent.declarer.InterceptDeclarer; +import io.sermant.core.plugin.agent.matcher.ClassMatcher; +import io.sermant.core.plugin.agent.matcher.MethodMatcher; +import io.sermant.mq.grayscale.rocketmq.interceptor.RocketMqConsumerConstructorInterceptor; + +/** + * pull consumer set gray consumer group declarer + * + * @author chengyouling + * @since 2024-09-07 + **/ +public class RocketMqPullConsumerConstructorDeclarer extends AbstractPluginDeclarer { + private static final String ENHANCE_CLASS = "org.apache.rocketmq.client.consumer.DefaultMQPullConsumer"; + + private static final String[] METHOD_PARAM_TYPES = { + "java.lang.String", + "java.lang.String", + "org.apache.rocketmq.remoting.RPCHook" + }; + + @Override + public ClassMatcher getClassMatcher() { + return ClassMatcher.nameEquals(ENHANCE_CLASS); + } + + @Override + public InterceptDeclarer[] getInterceptDeclarers(ClassLoader classLoader) { + return new InterceptDeclarer[]{ + InterceptDeclarer.build(MethodMatcher.isConstructor() + .and(MethodMatcher.paramTypesEqual(METHOD_PARAM_TYPES)), + new RocketMqConsumerConstructorInterceptor()) + }; + } +} diff --git a/sermant-plugins/sermant-mq-grayscale/mq-grayscale-rocketmq-plugin/src/main/java/io/sermant/mq/grayscale/rocketmq/declarer/RocketMqPullConsumerFetchDeclarer.java b/sermant-plugins/sermant-mq-grayscale/mq-grayscale-rocketmq-plugin/src/main/java/io/sermant/mq/grayscale/rocketmq/declarer/RocketMqPullConsumerFetchDeclarer.java new file mode 100644 index 0000000000..9471eafdc7 --- /dev/null +++ b/sermant-plugins/sermant-mq-grayscale/mq-grayscale-rocketmq-plugin/src/main/java/io/sermant/mq/grayscale/rocketmq/declarer/RocketMqPullConsumerFetchDeclarer.java @@ -0,0 +1,50 @@ +/* + * Copyright (C) 2024-2024 Sermant Authors. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.sermant.mq.grayscale.rocketmq.declarer; + +import io.sermant.core.plugin.agent.declarer.AbstractPluginDeclarer; +import io.sermant.core.plugin.agent.declarer.InterceptDeclarer; +import io.sermant.core.plugin.agent.matcher.ClassMatcher; +import io.sermant.core.plugin.agent.matcher.MethodMatcher; +import io.sermant.mq.grayscale.rocketmq.interceptor.RocketMqPullConsumerFetchInterceptor; + +/** + * pull consumer build consumer client config declarer + * + * @author chengyouling + * @since 2024-05-27 + **/ +public class RocketMqPullConsumerFetchDeclarer extends AbstractPluginDeclarer { + private static final String ENHANCE_CLASS = "org.apache.rocketmq.client.impl.consumer.DefaultMQPullConsumerImpl"; + + private static final String METHOD_FETCH_SUBSCRIBE = "fetchSubscribeMessageQueues"; + + private static final String METHOD_FETCH_QUEUE = "fetchMessageQueuesInBalance"; + + @Override + public ClassMatcher getClassMatcher() { + return ClassMatcher.nameEquals(ENHANCE_CLASS); + } + + @Override + public InterceptDeclarer[] getInterceptDeclarers(ClassLoader classLoader) { + return new InterceptDeclarer[]{ + InterceptDeclarer.build(MethodMatcher.nameContains(METHOD_FETCH_QUEUE, METHOD_FETCH_SUBSCRIBE), + new RocketMqPullConsumerFetchInterceptor()) + }; + } +} diff --git a/sermant-plugins/sermant-mq-grayscale/mq-grayscale-rocketmq-plugin/src/main/java/io/sermant/mq/grayscale/rocketmq/declarer/RocketMqPullConsumerSubscriptionUpdateDeclarer.java b/sermant-plugins/sermant-mq-grayscale/mq-grayscale-rocketmq-plugin/src/main/java/io/sermant/mq/grayscale/rocketmq/declarer/RocketMqPullConsumerSubscriptionUpdateDeclarer.java new file mode 100644 index 0000000000..6c22710dfb --- /dev/null +++ b/sermant-plugins/sermant-mq-grayscale/mq-grayscale-rocketmq-plugin/src/main/java/io/sermant/mq/grayscale/rocketmq/declarer/RocketMqPullConsumerSubscriptionUpdateDeclarer.java @@ -0,0 +1,48 @@ +/* + * Copyright (C) 2024-2024 Sermant Authors. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.sermant.mq.grayscale.rocketmq.declarer; + +import io.sermant.core.plugin.agent.declarer.AbstractPluginDeclarer; +import io.sermant.core.plugin.agent.declarer.InterceptDeclarer; +import io.sermant.core.plugin.agent.matcher.ClassMatcher; +import io.sermant.core.plugin.agent.matcher.MethodMatcher; +import io.sermant.mq.grayscale.rocketmq.interceptor.RocketMqPullConsumerSubscriptionUpdateInterceptor; + +/** + * pull consumer TAG/SQL92 query message statement build declarer + * + * @author chengyouling + * @since 2024-05-27 + **/ +public class RocketMqPullConsumerSubscriptionUpdateDeclarer extends AbstractPluginDeclarer { + private static final String ENHANCE_CLASS = "org.apache.rocketmq.client.impl.consumer.DefaultMQPullConsumerImpl"; + + private static final String METHOD_NAME = "getSubscriptionData"; + + @Override + public ClassMatcher getClassMatcher() { + return ClassMatcher.nameEquals(ENHANCE_CLASS); + } + + @Override + public InterceptDeclarer[] getInterceptDeclarers(ClassLoader classLoader) { + return new InterceptDeclarer[]{ + InterceptDeclarer.build(MethodMatcher.nameEquals(METHOD_NAME), + new RocketMqPullConsumerSubscriptionUpdateInterceptor()) + }; + } +} diff --git a/sermant-plugins/sermant-mq-grayscale/mq-grayscale-rocketmq-plugin/src/main/java/io/sermant/mq/grayscale/rocketmq/declarer/RocketMqPushConsumerConstructorDeclarer.java b/sermant-plugins/sermant-mq-grayscale/mq-grayscale-rocketmq-plugin/src/main/java/io/sermant/mq/grayscale/rocketmq/declarer/RocketMqPushConsumerConstructorDeclarer.java new file mode 100644 index 0000000000..9a4e2ea30f --- /dev/null +++ b/sermant-plugins/sermant-mq-grayscale/mq-grayscale-rocketmq-plugin/src/main/java/io/sermant/mq/grayscale/rocketmq/declarer/RocketMqPushConsumerConstructorDeclarer.java @@ -0,0 +1,70 @@ +/* + * Copyright (C) 2024-2024 Sermant Authors. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.sermant.mq.grayscale.rocketmq.declarer; + +import io.sermant.core.plugin.agent.declarer.AbstractPluginDeclarer; +import io.sermant.core.plugin.agent.declarer.InterceptDeclarer; +import io.sermant.core.plugin.agent.matcher.ClassMatcher; +import io.sermant.core.plugin.agent.matcher.MethodMatcher; +import io.sermant.mq.grayscale.rocketmq.interceptor.RocketMqConsumerConstructorInterceptor; + +/** + * push consumer set gray consumer group declarer + * + * @author chengyouling + * @since 2024-09-07 + **/ +public class RocketMqPushConsumerConstructorDeclarer extends AbstractPluginDeclarer { + private static final String ENHANCE_CLASS = "org.apache.rocketmq.client.consumer.DefaultMQPushConsumer"; + + private static final String PARAMETER_STRING = "java.lang.String"; + + private static final String PARAMETER_HOOK = "org.apache.rocketmq.remoting.RPCHook"; + + private static final String PARAMETER_STRATEGY = "org.apache.rocketmq.client.consumer.AllocateMessageQueueStrategy"; + + private static final String[] METHOD_FOUR_PARAM_TYPES = { + PARAMETER_STRING, + PARAMETER_STRING, + PARAMETER_HOOK, + PARAMETER_STRATEGY + }; + + private static final String[] METHOD_SIX_PARAMS = { + PARAMETER_STRING, + PARAMETER_STRING, + PARAMETER_HOOK, + PARAMETER_STRATEGY, + "boolean", + PARAMETER_STRING + }; + + @Override + public ClassMatcher getClassMatcher() { + return ClassMatcher.nameEquals(ENHANCE_CLASS); + } + + @Override + public InterceptDeclarer[] getInterceptDeclarers(ClassLoader classLoader) { + return new InterceptDeclarer[]{ + InterceptDeclarer.build(MethodMatcher.isConstructor() + .and(MethodMatcher.paramTypesEqual(METHOD_FOUR_PARAM_TYPES) + .or(MethodMatcher.paramTypesEqual(METHOD_SIX_PARAMS))), + new RocketMqConsumerConstructorInterceptor()) + }; + } +} diff --git a/sermant-plugins/sermant-mq-grayscale/mq-grayscale-rocketmq-plugin/src/main/java/io/sermant/mq/grayscale/rocketmq/declarer/RocketMqPushConsumerSubscribeFetchDeclarer.java b/sermant-plugins/sermant-mq-grayscale/mq-grayscale-rocketmq-plugin/src/main/java/io/sermant/mq/grayscale/rocketmq/declarer/RocketMqPushConsumerSubscribeFetchDeclarer.java new file mode 100644 index 0000000000..1df882cf7b --- /dev/null +++ b/sermant-plugins/sermant-mq-grayscale/mq-grayscale-rocketmq-plugin/src/main/java/io/sermant/mq/grayscale/rocketmq/declarer/RocketMqPushConsumerSubscribeFetchDeclarer.java @@ -0,0 +1,50 @@ +/* + * Copyright (C) 2024-2024 Sermant Authors. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.sermant.mq.grayscale.rocketmq.declarer; + +import io.sermant.core.plugin.agent.declarer.AbstractPluginDeclarer; +import io.sermant.core.plugin.agent.declarer.InterceptDeclarer; +import io.sermant.core.plugin.agent.matcher.ClassMatcher; +import io.sermant.core.plugin.agent.matcher.MethodMatcher; +import io.sermant.mq.grayscale.rocketmq.interceptor.RocketMqPushConsumerSubscribeFetchInterceptor; + +/** + * push consumer build consumer client config declarer + * + * @author chengyouling + * @since 2024-05-27 + **/ +public class RocketMqPushConsumerSubscribeFetchDeclarer extends AbstractPluginDeclarer { + private static final String ENHANCE_CLASS = "org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl"; + + private static final String METHOD_FETCH_SUBSCRIBE = "fetchSubscribeMessageQueues"; + + private static final String METHOD_SUBSCRIBE = "subscribe"; + + @Override + public ClassMatcher getClassMatcher() { + return ClassMatcher.nameEquals(ENHANCE_CLASS); + } + + @Override + public InterceptDeclarer[] getInterceptDeclarers(ClassLoader classLoader) { + return new InterceptDeclarer[]{ + InterceptDeclarer.build(MethodMatcher.nameContains(METHOD_SUBSCRIBE, METHOD_FETCH_SUBSCRIBE), + new RocketMqPushConsumerSubscribeFetchInterceptor()) + }; + } +} diff --git a/sermant-plugins/sermant-mq-grayscale/mq-grayscale-rocketmq-plugin/src/main/java/io/sermant/mq/grayscale/rocketmq/declarer/RocketMqSchedulerRebuildSubscriptionDeclarer.java b/sermant-plugins/sermant-mq-grayscale/mq-grayscale-rocketmq-plugin/src/main/java/io/sermant/mq/grayscale/rocketmq/declarer/RocketMqSchedulerRebuildSubscriptionDeclarer.java new file mode 100644 index 0000000000..0606c8e3dd --- /dev/null +++ b/sermant-plugins/sermant-mq-grayscale/mq-grayscale-rocketmq-plugin/src/main/java/io/sermant/mq/grayscale/rocketmq/declarer/RocketMqSchedulerRebuildSubscriptionDeclarer.java @@ -0,0 +1,48 @@ +/* + * Copyright (C) 2024-2024 Sermant Authors. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.sermant.mq.grayscale.rocketmq.declarer; + +import io.sermant.core.plugin.agent.declarer.AbstractPluginDeclarer; +import io.sermant.core.plugin.agent.declarer.InterceptDeclarer; +import io.sermant.core.plugin.agent.matcher.ClassMatcher; +import io.sermant.core.plugin.agent.matcher.MethodMatcher; +import io.sermant.mq.grayscale.rocketmq.interceptor.RocketMqSchedulerRebuildSubscriptionInterceptor; + +/** + * scheduler update SQL92 query message statement declarer + * + * @author chengyouling + * @since 2024-05-27 + **/ +public class RocketMqSchedulerRebuildSubscriptionDeclarer extends AbstractPluginDeclarer { + private static final String ENHANCE_CLASS = "org.apache.rocketmq.client.impl.consumer.RebalanceImpl"; + + private static final String METHOD_NAME = "getSubscriptionInner"; + + @Override + public ClassMatcher getClassMatcher() { + return ClassMatcher.nameEquals(ENHANCE_CLASS); + } + + @Override + public InterceptDeclarer[] getInterceptDeclarers(ClassLoader classLoader) { + return new InterceptDeclarer[]{ + InterceptDeclarer.build(MethodMatcher.nameEquals(METHOD_NAME), + new RocketMqSchedulerRebuildSubscriptionInterceptor()) + }; + } +} diff --git a/sermant-plugins/sermant-mq-grayscale/mq-grayscale-rocketmq-plugin/src/main/java/io/sermant/mq/grayscale/rocketmq/interceptor/RocketMqAbstractIntercepter.java b/sermant-plugins/sermant-mq-grayscale/mq-grayscale-rocketmq-plugin/src/main/java/io/sermant/mq/grayscale/rocketmq/interceptor/RocketMqAbstractIntercepter.java new file mode 100644 index 0000000000..de6be26307 --- /dev/null +++ b/sermant-plugins/sermant-mq-grayscale/mq-grayscale-rocketmq-plugin/src/main/java/io/sermant/mq/grayscale/rocketmq/interceptor/RocketMqAbstractIntercepter.java @@ -0,0 +1,71 @@ +/* + * Copyright (C) 2024-2024 Sermant Authors. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.sermant.mq.grayscale.rocketmq.interceptor; + +import io.sermant.core.plugin.agent.entity.ExecuteContext; +import io.sermant.core.plugin.agent.interceptor.AbstractInterceptor; +import io.sermant.core.utils.StringUtils; +import io.sermant.mq.grayscale.rocketmq.service.RocketMqConsumerGroupAutoCheck; +import io.sermant.mq.grayscale.rocketmq.utils.RocketMqGrayscaleConfigUtils; +import io.sermant.mq.grayscale.rocketmq.utils.RocketMqSubscriptionDataUtils; + +/** + * mq consumer abstract interceptor + * + * @author chengyouling + * @since 2024-09-04 + **/ +public abstract class RocketMqAbstractIntercepter extends AbstractInterceptor { + @Override + public ExecuteContext before(ExecuteContext context) throws Exception { + return context; + } + + @Override + public ExecuteContext after(ExecuteContext context) throws Exception { + RocketMqGrayscaleConfigUtils.checkAndUpdateCacheGrayConfig(); + if (!RocketMqGrayscaleConfigUtils.isPluginEnabled()) { + return context; + } + return doAfter(context); + } + + /** + * Handling after the intercept point + * + * @param context context + * @return context + * @throws Exception Exception + */ + protected abstract ExecuteContext doAfter(ExecuteContext context) throws Exception; + + /** + * build gray group name and set consumer client configs + * + * @param namesrvAddr namesrvAddr + * @param topic topic + * @param consumerGroup consumerGroup + */ + protected void buildGroupAndClientConfig(String namesrvAddr, String topic, String consumerGroup) { + String grayGroupTag = RocketMqGrayscaleConfigUtils.getGrayGroupTag(); + if (StringUtils.isEmpty(grayGroupTag)) { + RocketMqConsumerGroupAutoCheck.setConsumerClientConfig(namesrvAddr, topic, consumerGroup); + } else { + RocketMqSubscriptionDataUtils.setGrayGroupTagChangeMap(namesrvAddr, topic, consumerGroup, true); + } + } +} diff --git a/sermant-plugins/sermant-mq-grayscale/mq-grayscale-rocketmq-plugin/src/main/java/io/sermant/mq/grayscale/rocketmq/interceptor/RocketMqConsumerConstructorInterceptor.java b/sermant-plugins/sermant-mq-grayscale/mq-grayscale-rocketmq-plugin/src/main/java/io/sermant/mq/grayscale/rocketmq/interceptor/RocketMqConsumerConstructorInterceptor.java new file mode 100644 index 0000000000..14d05775f9 --- /dev/null +++ b/sermant-plugins/sermant-mq-grayscale/mq-grayscale-rocketmq-plugin/src/main/java/io/sermant/mq/grayscale/rocketmq/interceptor/RocketMqConsumerConstructorInterceptor.java @@ -0,0 +1,58 @@ +/* + * Copyright (C) 2024-2024 Sermant Authors. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.sermant.mq.grayscale.rocketmq.interceptor; + +import io.sermant.core.plugin.agent.entity.ExecuteContext; +import io.sermant.core.plugin.agent.interceptor.AbstractInterceptor; +import io.sermant.core.utils.StringUtils; +import io.sermant.mq.grayscale.rocketmq.utils.RocketMqGrayscaleConfigUtils; + +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * DefaultMQPushConsumer/DefaultLitePullConsumer/DefaultMQPullConsumer Constructor method interceptor + * gray scene reset consumerGroup with grayGroupTag + * + * @author chengyouling + * @since 2024-05-27 + **/ +public class RocketMqConsumerConstructorInterceptor extends AbstractInterceptor { + private final AtomicBoolean cacheConfigInit = new AtomicBoolean(false); + + @Override + public ExecuteContext before(ExecuteContext context) throws Exception { + if (cacheConfigInit.compareAndSet(false, true)) { + RocketMqGrayscaleConfigUtils.createMqGrayConfigs(); + } + if (!RocketMqGrayscaleConfigUtils.isPluginEnabled()) { + return context; + } + String grayGroupTag = RocketMqGrayscaleConfigUtils.getGrayGroupTag(); + if (StringUtils.isEmpty(grayGroupTag)) { + return context; + } + String originGroup = (String) context.getArguments()[1]; + context.getArguments()[1] + = originGroup.contains("_" + grayGroupTag) ? originGroup : originGroup + "_" + grayGroupTag; + return context; + } + + @Override + public ExecuteContext after(ExecuteContext context) throws Exception { + return context; + } +} diff --git a/sermant-plugins/sermant-mq-grayscale/mq-grayscale-rocketmq-plugin/src/main/java/io/sermant/mq/grayscale/rocketmq/interceptor/RocketMqLitePullConsumerSubscribeInterceptor.java b/sermant-plugins/sermant-mq-grayscale/mq-grayscale-rocketmq-plugin/src/main/java/io/sermant/mq/grayscale/rocketmq/interceptor/RocketMqLitePullConsumerSubscribeInterceptor.java new file mode 100644 index 0000000000..45612a9e21 --- /dev/null +++ b/sermant-plugins/sermant-mq-grayscale/mq-grayscale-rocketmq-plugin/src/main/java/io/sermant/mq/grayscale/rocketmq/interceptor/RocketMqLitePullConsumerSubscribeInterceptor.java @@ -0,0 +1,40 @@ +/* + * Copyright (C) 2024-2024 Sermant Authors. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.sermant.mq.grayscale.rocketmq.interceptor; + +import io.sermant.core.plugin.agent.entity.ExecuteContext; + +import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer; +import org.apache.rocketmq.client.impl.consumer.DefaultLitePullConsumerImpl; + +/** + * LitePullConsumer subscribe/fetchMessageQueues method interceptor + * base scene recording namesrvAddr、topic、group info + * + * @author chengyouling + * @since 2024-05-27 + **/ +public class RocketMqLitePullConsumerSubscribeInterceptor extends RocketMqAbstractIntercepter { + @Override + public ExecuteContext doAfter(ExecuteContext context) throws Exception { + DefaultLitePullConsumerImpl litePullConsumerImpl = (DefaultLitePullConsumerImpl) context.getObject(); + DefaultLitePullConsumer pullConsumer = litePullConsumerImpl.getDefaultLitePullConsumer(); + buildGroupAndClientConfig(pullConsumer.getNamesrvAddr(), (String) context.getArguments()[0], + pullConsumer.getConsumerGroup()); + return context; + } +} diff --git a/sermant-plugins/sermant-mq-grayscale/mq-grayscale-rocketmq-plugin/src/main/java/io/sermant/mq/grayscale/rocketmq/interceptor/RocketMqProducerGrayMessageHookInterceptor.java b/sermant-plugins/sermant-mq-grayscale/mq-grayscale-rocketmq-plugin/src/main/java/io/sermant/mq/grayscale/rocketmq/interceptor/RocketMqProducerGrayMessageHookInterceptor.java new file mode 100644 index 0000000000..e2f071a9c7 --- /dev/null +++ b/sermant-plugins/sermant-mq-grayscale/mq-grayscale-rocketmq-plugin/src/main/java/io/sermant/mq/grayscale/rocketmq/interceptor/RocketMqProducerGrayMessageHookInterceptor.java @@ -0,0 +1,46 @@ +/* + * Copyright (C) 2024-2024 Sermant Authors. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.sermant.mq.grayscale.rocketmq.interceptor; + +import io.sermant.core.plugin.agent.entity.ExecuteContext; +import io.sermant.core.utils.ReflectUtils; +import io.sermant.mq.grayscale.rocketmq.service.RocketMqGraySendMessageHook; + +import org.apache.rocketmq.client.hook.SendMessageHook; +import org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl; + +import java.lang.reflect.Method; +import java.util.Optional; + +/** + * SendMessageHook builder interceptor + * + * @author chengyouling + * @since 2024-05-27 + **/ +public class RocketMqProducerGrayMessageHookInterceptor extends RocketMqAbstractIntercepter { + @Override + public ExecuteContext doAfter(ExecuteContext context) throws Exception { + DefaultMQProducerImpl producer = (DefaultMQProducerImpl) context.getObject(); + Optional method = ReflectUtils.findMethod(producer.getClass(), "registerSendMessageHook", + new Class[]{SendMessageHook.class}); + if (method.isPresent()) { + method.get().invoke(producer, new RocketMqGraySendMessageHook()); + } + return context; + } +} diff --git a/sermant-plugins/sermant-mq-grayscale/mq-grayscale-rocketmq-plugin/src/main/java/io/sermant/mq/grayscale/rocketmq/interceptor/RocketMqPullConsumerFetchInterceptor.java b/sermant-plugins/sermant-mq-grayscale/mq-grayscale-rocketmq-plugin/src/main/java/io/sermant/mq/grayscale/rocketmq/interceptor/RocketMqPullConsumerFetchInterceptor.java new file mode 100644 index 0000000000..69ae8c071b --- /dev/null +++ b/sermant-plugins/sermant-mq-grayscale/mq-grayscale-rocketmq-plugin/src/main/java/io/sermant/mq/grayscale/rocketmq/interceptor/RocketMqPullConsumerFetchInterceptor.java @@ -0,0 +1,40 @@ +/* + * Copyright (C) 2024-2024 Sermant Authors. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.sermant.mq.grayscale.rocketmq.interceptor; + +import io.sermant.core.plugin.agent.entity.ExecuteContext; + +import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer; +import org.apache.rocketmq.client.impl.consumer.DefaultMQPullConsumerImpl; + +/** + * PullConsumer fetchSubscribeMessageQueues/fetchMessageQueuesInBalance method interceptor + * base scene recording namesrvAddr、topic、group info + * + * @author chengyouling + * @since 2024-05-27 + **/ +public class RocketMqPullConsumerFetchInterceptor extends RocketMqAbstractIntercepter { + @Override + public ExecuteContext doAfter(ExecuteContext context) throws Exception { + DefaultMQPullConsumer pullConsumer = + ((DefaultMQPullConsumerImpl) context.getObject()).getDefaultMQPullConsumer(); + buildGroupAndClientConfig(pullConsumer.getNamesrvAddr(), (String) context.getArguments()[0], + pullConsumer.getConsumerGroup()); + return context; + } +} diff --git a/sermant-plugins/sermant-mq-grayscale/mq-grayscale-rocketmq-plugin/src/main/java/io/sermant/mq/grayscale/rocketmq/interceptor/RocketMqPullConsumerSubscriptionUpdateInterceptor.java b/sermant-plugins/sermant-mq-grayscale/mq-grayscale-rocketmq-plugin/src/main/java/io/sermant/mq/grayscale/rocketmq/interceptor/RocketMqPullConsumerSubscriptionUpdateInterceptor.java new file mode 100644 index 0000000000..a174104284 --- /dev/null +++ b/sermant-plugins/sermant-mq-grayscale/mq-grayscale-rocketmq-plugin/src/main/java/io/sermant/mq/grayscale/rocketmq/interceptor/RocketMqPullConsumerSubscriptionUpdateInterceptor.java @@ -0,0 +1,75 @@ +/* + * Copyright (C) 2024-2024 Sermant Authors. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.sermant.mq.grayscale.rocketmq.interceptor; + +import io.sermant.core.common.LoggerFactory; +import io.sermant.core.plugin.agent.entity.ExecuteContext; +import io.sermant.core.utils.ReflectUtils; +import io.sermant.core.utils.StringUtils; +import io.sermant.mq.grayscale.rocketmq.service.RocketMqConsumerGroupAutoCheck; +import io.sermant.mq.grayscale.rocketmq.utils.RocketMqGrayscaleConfigUtils; +import io.sermant.mq.grayscale.rocketmq.utils.RocketMqSubscriptionDataUtils; + +import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer; +import org.apache.rocketmq.client.impl.consumer.DefaultMQPullConsumerImpl; +import org.apache.rocketmq.client.impl.factory.MQClientInstance; +import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData; + +import java.util.Optional; +import java.util.logging.Level; +import java.util.logging.Logger; + +/** + * update pull consumer subscription SQL92 query statement interceptor + * + * @author chengyouling + * @since 2024-07-27 + **/ +public class RocketMqPullConsumerSubscriptionUpdateInterceptor extends RocketMqAbstractIntercepter { + private static final Logger LOGGER = LoggerFactory.getLogger(); + + @Override + public ExecuteContext doAfter(ExecuteContext context) throws Exception { + SubscriptionData subscriptionData = (SubscriptionData) context.getResult(); + if (RocketMqSubscriptionDataUtils.isExpressionTypeInaccurate(subscriptionData.getExpressionType())) { + return context; + } + Optional fieldValue = ReflectUtils.getFieldValue(context.getObject(), "mQClientFactory"); + if (!fieldValue.isPresent()) { + LOGGER.log(Level.SEVERE, "field mQClientFactory is not exist!"); + return context; + } + MQClientInstance instance = (MQClientInstance) fieldValue.get(); + buildSql92SubscriptionData(context, subscriptionData, instance); + return context; + } + + private void buildSql92SubscriptionData(ExecuteContext context, SubscriptionData subscriptionData, + MQClientInstance instance) { + DefaultMQPullConsumer pullConsumer + = ((DefaultMQPullConsumerImpl) context.getObject()).getDefaultMQPullConsumer(); + String consumerGroup = pullConsumer.getConsumerGroup(); + String namesrvAddr = instance.getClientConfig().getNamesrvAddr(); + if (StringUtils.isEmpty(RocketMqGrayscaleConfigUtils.getGrayGroupTag())) { + RocketMqConsumerGroupAutoCheck.setMqClientInstance(subscriptionData.getTopic(), consumerGroup, instance); + } + RocketMqConsumerGroupAutoCheck.queryGrayConsumerGroup(); + String subscribeScope = RocketMqSubscriptionDataUtils.buildSubscribeScope(subscriptionData.getTopic(), + consumerGroup, namesrvAddr); + RocketMqSubscriptionDataUtils.resetsSql92SubscriptionData(subscriptionData, subscribeScope); + } +} diff --git a/sermant-plugins/sermant-mq-grayscale/mq-grayscale-rocketmq-plugin/src/main/java/io/sermant/mq/grayscale/rocketmq/interceptor/RocketMqPushConsumerSubscribeFetchInterceptor.java b/sermant-plugins/sermant-mq-grayscale/mq-grayscale-rocketmq-plugin/src/main/java/io/sermant/mq/grayscale/rocketmq/interceptor/RocketMqPushConsumerSubscribeFetchInterceptor.java new file mode 100644 index 0000000000..e1b50d7a3f --- /dev/null +++ b/sermant-plugins/sermant-mq-grayscale/mq-grayscale-rocketmq-plugin/src/main/java/io/sermant/mq/grayscale/rocketmq/interceptor/RocketMqPushConsumerSubscribeFetchInterceptor.java @@ -0,0 +1,40 @@ +/* + * Copyright (C) 2024-2024 Sermant Authors. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.sermant.mq.grayscale.rocketmq.interceptor; + +import io.sermant.core.plugin.agent.entity.ExecuteContext; + +import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; +import org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl; + +/** + * PushConsumer fetchSubscribeMessageQueues/subscribe method interceptor + * base scene recording namesrvAddr、topic、group info + * + * @author chengyouling + * @since 2024-05-27 + **/ +public class RocketMqPushConsumerSubscribeFetchInterceptor extends RocketMqAbstractIntercepter { + @Override + public ExecuteContext doAfter(ExecuteContext context) throws Exception { + DefaultMQPushConsumerImpl pushConsumerImpl = (DefaultMQPushConsumerImpl) context.getObject(); + DefaultMQPushConsumer pushConsumer = pushConsumerImpl.getDefaultMQPushConsumer(); + buildGroupAndClientConfig(pushConsumer.getNamesrvAddr(), (String) context.getArguments()[0], + pushConsumer.getConsumerGroup()); + return context; + } +} diff --git a/sermant-plugins/sermant-mq-grayscale/mq-grayscale-rocketmq-plugin/src/main/java/io/sermant/mq/grayscale/rocketmq/interceptor/RocketMqSchedulerRebuildSubscriptionInterceptor.java b/sermant-plugins/sermant-mq-grayscale/mq-grayscale-rocketmq-plugin/src/main/java/io/sermant/mq/grayscale/rocketmq/interceptor/RocketMqSchedulerRebuildSubscriptionInterceptor.java new file mode 100644 index 0000000000..4de5affe2a --- /dev/null +++ b/sermant-plugins/sermant-mq-grayscale/mq-grayscale-rocketmq-plugin/src/main/java/io/sermant/mq/grayscale/rocketmq/interceptor/RocketMqSchedulerRebuildSubscriptionInterceptor.java @@ -0,0 +1,78 @@ +/* + * Copyright (C) 2024-2024 Sermant Authors. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.sermant.mq.grayscale.rocketmq.interceptor; + +import io.sermant.core.plugin.agent.entity.ExecuteContext; +import io.sermant.core.utils.StringUtils; +import io.sermant.mq.grayscale.rocketmq.service.RocketMqConsumerGroupAutoCheck; +import io.sermant.mq.grayscale.rocketmq.utils.RocketMqGrayscaleConfigUtils; +import io.sermant.mq.grayscale.rocketmq.utils.RocketMqSubscriptionDataUtils; + +import org.apache.rocketmq.client.impl.consumer.RebalanceImpl; +import org.apache.rocketmq.client.impl.factory.MQClientInstance; +import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData; + +import java.util.concurrent.ConcurrentMap; + +/** + * TAG/SQL92 query message statement interceptor + * + * @author chengyouling + * @since 2024-05-27 + **/ +public class RocketMqSchedulerRebuildSubscriptionInterceptor extends RocketMqAbstractIntercepter { + private final Object lock = new Object(); + + @Override + public ExecuteContext doAfter(ExecuteContext context) throws Exception { + ConcurrentMap map = (ConcurrentMap) context.getResult(); + RebalanceImpl balance = (RebalanceImpl) context.getObject(); + if (balance.getConsumerGroup() == null) { + return context; + } + for (SubscriptionData subscriptionData : map.values()) { + if (RocketMqSubscriptionDataUtils.isExpressionTypeInaccurate(subscriptionData.getExpressionType())) { + continue; + } + buildSql92SubscriptionData(subscriptionData, balance); + } + return context; + } + + private void buildSql92SubscriptionData(SubscriptionData subscriptionData, RebalanceImpl balance) { + synchronized (lock) { + String topic = subscriptionData.getTopic(); + if (!RocketMqSubscriptionDataUtils.getGrayTagChangeFlag(topic, balance)) { + return; + } + String consumerGroup = balance.getConsumerGroup(); + MQClientInstance instance = balance.getmQClientFactory(); + if (StringUtils.isEmpty(RocketMqGrayscaleConfigUtils.getGrayGroupTag())) { + RocketMqConsumerGroupAutoCheck.setMqClientInstance(topic, consumerGroup, instance); + } + RocketMqConsumerGroupAutoCheck.queryGrayConsumerGroup(); + RocketMqConsumerGroupAutoCheck.startSchedulerCheckGroupTask(); + String namesrvAddr = balance.getmQClientFactory().getClientConfig().getNamesrvAddr(); + String subscribeScope = RocketMqSubscriptionDataUtils.buildSubscribeScope(topic, consumerGroup, + namesrvAddr); + RocketMqSubscriptionDataUtils.resetsSql92SubscriptionData(subscriptionData, subscribeScope); + + // update change flag when finished build substr + RocketMqSubscriptionDataUtils.resetTagChangeMap(namesrvAddr, topic, consumerGroup, false); + } + } +} diff --git a/sermant-plugins/sermant-mq-grayscale/mq-grayscale-rocketmq-plugin/src/main/java/io/sermant/mq/grayscale/rocketmq/service/RocketMqConsumerGroupAutoCheck.java b/sermant-plugins/sermant-mq-grayscale/mq-grayscale-rocketmq-plugin/src/main/java/io/sermant/mq/grayscale/rocketmq/service/RocketMqConsumerGroupAutoCheck.java new file mode 100644 index 0000000000..1937a5945f --- /dev/null +++ b/sermant-plugins/sermant-mq-grayscale/mq-grayscale-rocketmq-plugin/src/main/java/io/sermant/mq/grayscale/rocketmq/service/RocketMqConsumerGroupAutoCheck.java @@ -0,0 +1,264 @@ +/* + * Copyright (C) 2024-2024 Sermant Authors. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.sermant.mq.grayscale.rocketmq.service; + +import io.sermant.core.common.LoggerFactory; +import io.sermant.mq.grayscale.config.ConsumeModeEnum; +import io.sermant.mq.grayscale.config.GrayTagItem; +import io.sermant.mq.grayscale.rocketmq.config.RocketMqConsumerClientConfig; +import io.sermant.mq.grayscale.rocketmq.utils.RocketMqGrayscaleConfigUtils; +import io.sermant.mq.grayscale.rocketmq.utils.RocketMqSubscriptionDataUtils; + +import org.apache.commons.lang3.StringUtils; +import org.apache.rocketmq.client.exception.MQBrokerException; +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.client.impl.MQClientAPIImpl; +import org.apache.rocketmq.client.impl.factory.MQClientInstance; +import org.apache.rocketmq.common.protocol.body.GroupList; +import org.apache.rocketmq.common.protocol.route.BrokerData; +import org.apache.rocketmq.common.protocol.route.TopicRouteData; +import org.apache.rocketmq.remoting.exception.RemotingConnectException; +import org.apache.rocketmq.remoting.exception.RemotingSendRequestException; +import org.apache.rocketmq.remoting.exception.RemotingTimeoutException; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.logging.Level; +import java.util.logging.Logger; + +/** + * consumer group auto check service + * + * @author chengyouling + * @since 2024-05-27 + **/ +public class RocketMqConsumerGroupAutoCheck { + private static final Logger LOGGER = LoggerFactory.getLogger(); + + private static final ScheduledExecutorService EXECUTOR_SERVICE = Executors.newSingleThreadScheduledExecutor(); + + /** + * gray consumer tags at last time + * key: namesrvAddr@topic@consumerGroup + * value: consumer gray tags + */ + private static final Map> LAST_TOPIC_GROUP_GRAY_TAG = new HashMap<>(); + + /** + * client configs for query consumer group + * key: namesrvAddr@topic@consumerGroup + * value: client config + */ + private static final Map CONSUMER_CLIENT_CONFIG_MAP = new HashMap<>(); + + private static final AtomicBoolean START_AUTO_CHECK = new AtomicBoolean(false); + + private static final long INITIAL_DELAY = 10L; + + private static final long ROCKET_MQ_READ_TIMEOUT = 5000L; + + private RocketMqConsumerGroupAutoCheck() { + } + + /** + * set current client MQClientInstance info + * + * @param topic topic + * @param consumerGroup consumerGroup + * @param mqClientInstance mqClientInstance + */ + public static void setMqClientInstance(String topic, String consumerGroup, MQClientInstance mqClientInstance) { + String subscribeScope = RocketMqSubscriptionDataUtils.buildSubscribeScope(topic, consumerGroup, + mqClientInstance.getClientConfig().getNamesrvAddr()); + RocketMqConsumerClientConfig currentConfig = CONSUMER_CLIENT_CONFIG_MAP.get(subscribeScope); + if (currentConfig != null && currentConfig.getMqClientInstance() == null) { + currentConfig.setMqClientInstance(mqClientInstance); + } + } + + /** + * query gray consumer group + */ + public static void queryGrayConsumerGroup() { + if (RocketMqGrayscaleConfigUtils.getConsumeType() == ConsumeModeEnum.AUTO && !START_AUTO_CHECK.get()) { + // sync to obtain current gray consumer group at AUTO mode before start scheduler check group + findGrayConsumerGroupAndUpdateGrayTags(); + } + } + + /** + * start scheduler check gray consumer is changed + */ + public static void startSchedulerCheckGroupTask() { + if (RocketMqGrayscaleConfigUtils.getConsumeType() == ConsumeModeEnum.AUTO) { + if (START_AUTO_CHECK.compareAndSet(false, true)) { + EXECUTOR_SERVICE.scheduleWithFixedDelay( + RocketMqConsumerGroupAutoCheck::findGrayConsumerGroupAndUpdateGrayTags, INITIAL_DELAY, + RocketMqGrayscaleConfigUtils.getAutoCheckDelayTime(), TimeUnit.SECONDS); + } + } + } + + /** + * find gray consumer group and update gray tags + */ + public static void findGrayConsumerGroupAndUpdateGrayTags() { + if (CONSUMER_CLIENT_CONFIG_MAP.isEmpty()) { + return; + } + if (RocketMqGrayscaleConfigUtils.getConsumeType() != ConsumeModeEnum.AUTO) { + return; + } + if (!StringUtils.isEmpty(RocketMqGrayscaleConfigUtils.getGrayGroupTag())) { + return; + } + if (RocketMqGrayscaleConfigUtils.getGrayscaleConfigs() == null + || RocketMqGrayscaleConfigUtils.getGrayscaleConfigs().getGrayscale().isEmpty()) { + return; + } + for (RocketMqConsumerClientConfig clientConfig : CONSUMER_CLIENT_CONFIG_MAP.values()) { + if (clientConfig.getMqClientInstance() == null) { + continue; + } + Set grayTags = findGrayConsumerGroupAndGetTags(clientConfig); + LOGGER.log(Level.INFO,"[auto-check] current find gray tags: {0}.", grayTags); + resetAutoCheckGrayTagItems(grayTags, clientConfig); + } + } + + /** + * querying all consumer groups of Topic and Collecting grayGroupTag + * + * @param clientConfig clientConfig + * @return grayTags + */ + private static Set findGrayConsumerGroupAndGetTags(RocketMqConsumerClientConfig clientConfig) { + try { + MQClientAPIImpl mqClientApi = clientConfig.getMqClientInstance().getMQClientAPIImpl(); + String brokerAddress = getBrokerAddress(clientConfig.getTopic(), mqClientApi); + GroupList groupList = mqClientApi.queryTopicConsumeByWho(brokerAddress, clientConfig.getTopic(), + ROCKET_MQ_READ_TIMEOUT); + return getGrayTagsByConsumerGroup(groupList, brokerAddress, mqClientApi, + clientConfig.getConsumerGroup()); + } catch (MQClientException | InterruptedException | RemotingTimeoutException | RemotingSendRequestException + | RemotingConnectException | MQBrokerException e) { + LOGGER.log(Level.FINE, String.format(Locale.ENGLISH, "[auto-check] error, message: %s", + e.getMessage()), e); + } + return new HashSet<>(); + } + + private static Set getGrayTagsByConsumerGroup(GroupList groupList, String brokerAddress, + MQClientAPIImpl mqClientApi, String consumerGroup) { + Set grayTags = new HashSet<>(); + for (String group : groupList.getGroupList()) { + try { + List consumerIds = mqClientApi.getConsumerIdListByGroup(brokerAddress, group, + ROCKET_MQ_READ_TIMEOUT); + if (consumerIds.isEmpty()) { + continue; + } + String grayTag = StringUtils.substringAfterLast(group, consumerGroup + "_"); + if (!StringUtils.isEmpty(grayTag)) { + grayTags.add(grayTag); + } + } catch (RemotingConnectException | RemotingSendRequestException | RemotingTimeoutException + | MQBrokerException | InterruptedException e) { + LOGGER.warning(String.format(Locale.ENGLISH, "[auto-check] can not find ids in group: [%s].", + group)); + } + } + return grayTags; + } + + private static String getBrokerAddress(String topic, MQClientAPIImpl mqClientApi) + throws RemotingSendRequestException, RemotingConnectException, RemotingTimeoutException, + InterruptedException, MQClientException { + TopicRouteData topicRouteData = mqClientApi.getTopicRouteInfoFromNameServer(topic, ROCKET_MQ_READ_TIMEOUT, + false); + List brokerList = new ArrayList<>(); + for (BrokerData brokerData : topicRouteData.getBrokerDatas()) { + brokerList.addAll(brokerData.getBrokerAddrs().values()); + } + + // cluster mode has multiple addresses, just select one + return brokerList.get(0); + } + + /** + * compare current query grayGroupTag with collected last time, reset autoCheckGrayTagItems + * + * @param grayTags grayTags + * @param clientConfig MqConsumerClientConfig + */ + private static void resetAutoCheckGrayTagItems(Set grayTags, RocketMqConsumerClientConfig clientConfig) { + String subscribeScope = RocketMqSubscriptionDataUtils.buildSubscribeScope(clientConfig.getTopic(), + clientConfig.getConsumerGroup(), clientConfig.getAddress()); + if (grayTags.isEmpty()) { + if (LAST_TOPIC_GROUP_GRAY_TAG.containsKey(subscribeScope)) { + RocketMqSubscriptionDataUtils.resetAutoCheckGrayTagItems(new ArrayList<>(), clientConfig); + LAST_TOPIC_GROUP_GRAY_TAG.remove(subscribeScope); + } + return; + } + if (isGrayTagsChanged(grayTags, subscribeScope)) { + List grayTagItems = new ArrayList<>(); + for (String grayTag : grayTags) { + Optional item + = RocketMqGrayscaleConfigUtils.getGrayscaleConfigs().getGrayTagByGroupTag(grayTag); + item.ifPresent(grayTagItems::add); + } + LAST_TOPIC_GROUP_GRAY_TAG.put(subscribeScope, grayTags); + RocketMqSubscriptionDataUtils.resetAutoCheckGrayTagItems(grayTagItems, clientConfig); + } + } + + private static boolean isGrayTagsChanged(Set grayTags, String subscribeScope) { + HashSet currentGroups = new HashSet<>(grayTags); + Set lastTags = LAST_TOPIC_GROUP_GRAY_TAG.get(subscribeScope); + if (LAST_TOPIC_GROUP_GRAY_TAG.containsKey(subscribeScope)) { + currentGroups.removeAll(lastTags); + } + return !currentGroups.isEmpty() || grayTags.size() != lastTags.size(); + } + + /** + * set consumer client config + * + * @param address address + * @param topic topic + * @param consumerGroup consumerGroup + */ + public static void setConsumerClientConfig(String address, String topic, String consumerGroup) { + RocketMqConsumerClientConfig config = new RocketMqConsumerClientConfig(address, topic, consumerGroup); + String subscribeScope = RocketMqSubscriptionDataUtils.buildSubscribeScope(topic, consumerGroup, address); + if (!CONSUMER_CLIENT_CONFIG_MAP.containsKey(subscribeScope)) { + CONSUMER_CLIENT_CONFIG_MAP.put(subscribeScope, config); + RocketMqSubscriptionDataUtils.setAutoCheckTagChangeMap(address, topic, consumerGroup, true); + } + } +} diff --git a/sermant-plugins/sermant-mq-grayscale/mq-grayscale-rocketmq-plugin/src/main/java/io/sermant/mq/grayscale/rocketmq/service/RocketMqGraySendMessageHook.java b/sermant-plugins/sermant-mq-grayscale/mq-grayscale-rocketmq-plugin/src/main/java/io/sermant/mq/grayscale/rocketmq/service/RocketMqGraySendMessageHook.java new file mode 100644 index 0000000000..dd4d39faf9 --- /dev/null +++ b/sermant-plugins/sermant-mq-grayscale/mq-grayscale-rocketmq-plugin/src/main/java/io/sermant/mq/grayscale/rocketmq/service/RocketMqGraySendMessageHook.java @@ -0,0 +1,48 @@ +/* + * Copyright (C) 2024-2024 Sermant Authors. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.sermant.mq.grayscale.rocketmq.service; + +import io.sermant.mq.grayscale.rocketmq.utils.RocketMqGrayscaleConfigUtils; + +import org.apache.rocketmq.client.hook.SendMessageContext; +import org.apache.rocketmq.client.hook.SendMessageHook; +import org.apache.rocketmq.common.message.Message; + +/** + * gray message send hook service + * + * @author chengyouling + * @since 2024-05-27 + **/ +public class RocketMqGraySendMessageHook implements SendMessageHook { + @Override + public String hookName() { + return "MqGraySendMessageHook"; + } + + @Override + public void sendMessageBefore(SendMessageContext context) { + Message message = context.getMessage(); + + // set traffic tags in message by matching serviceMeta + RocketMqGrayscaleConfigUtils.setUserPropertyByServiceMeta(message); + } + + @Override + public void sendMessageAfter(SendMessageContext context) { + } +} diff --git a/sermant-plugins/sermant-mq-grayscale/sermant-mq-grayscale-plugin/src/main/java/io/sermant/mq/grayscale/utils/MqGrayscaleConfigUtils.java b/sermant-plugins/sermant-mq-grayscale/mq-grayscale-rocketmq-plugin/src/main/java/io/sermant/mq/grayscale/rocketmq/utils/RocketMqGrayscaleConfigUtils.java similarity index 85% rename from sermant-plugins/sermant-mq-grayscale/sermant-mq-grayscale-plugin/src/main/java/io/sermant/mq/grayscale/utils/MqGrayscaleConfigUtils.java rename to sermant-plugins/sermant-mq-grayscale/mq-grayscale-rocketmq-plugin/src/main/java/io/sermant/mq/grayscale/rocketmq/utils/RocketMqGrayscaleConfigUtils.java index 7bfe97f6a8..dd477822b6 100644 --- a/sermant-plugins/sermant-mq-grayscale/sermant-mq-grayscale-plugin/src/main/java/io/sermant/mq/grayscale/utils/MqGrayscaleConfigUtils.java +++ b/sermant-plugins/sermant-mq-grayscale/mq-grayscale-rocketmq-plugin/src/main/java/io/sermant/mq/grayscale/rocketmq/utils/RocketMqGrayscaleConfigUtils.java @@ -14,20 +14,19 @@ * limitations under the License. */ -package io.sermant.mq.grayscale.utils; +package io.sermant.mq.grayscale.rocketmq.utils; import io.sermant.core.config.ConfigManager; import io.sermant.core.plugin.config.PluginConfigManager; import io.sermant.core.plugin.config.ServiceMeta; -import io.sermant.core.service.dynamicconfig.common.DynamicConfigEventType; import io.sermant.mq.grayscale.config.ConsumeModeEnum; import io.sermant.mq.grayscale.config.GrayTagItem; import io.sermant.mq.grayscale.config.MqGrayscaleConfig; +import io.sermant.mq.grayscale.listener.MqGrayConfigCache; import org.apache.rocketmq.common.message.Message; import java.util.ArrayList; -import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -43,13 +42,14 @@ * @author chengyouling * @since 2024-06-03 */ -public class MqGrayscaleConfigUtils { +public class RocketMqGrayscaleConfigUtils { /** * serviceMeta info */ public static final Map MICRO_SERVICE_PROPERTIES = new HashMap<>(); - private static MqGrayscaleConfig cacheConfig = PluginConfigManager.getPluginConfig(MqGrayscaleConfig.class); + private static MqGrayscaleConfig cacheConfig + = PluginConfigManager.getPluginConfig(MqGrayscaleConfig.class); /** * all traffic tags that's been set, using for sql92 expression reset @@ -69,7 +69,7 @@ public class MqGrayscaleConfigUtils { } } - private MqGrayscaleConfigUtils() { + private RocketMqGrayscaleConfigUtils() { } /** @@ -113,33 +113,6 @@ public static String standardFormatGroupTag(String grayGroupTag) { return PATTERN.matcher(grayGroupTag.toLowerCase(Locale.ROOT)).replaceAll("-"); } - /** - * reset cache mqGrayscaleConfig - */ - public static void resetGrayscaleConfig() { - cacheConfig = new MqGrayscaleConfig(); - } - - /** - * set/update cache mqGrayscaleConfig - * - * @param config config - * @param eventType eventType - */ - public static void setGrayscaleConfig(MqGrayscaleConfig config, DynamicConfigEventType eventType) { - buildGrayTagsSet(config); - if (eventType == DynamicConfigEventType.CREATE) { - cacheConfig = config; - SubscriptionDataUtils.updateChangeFlag(); - return; - } - boolean isAllowRefresh = isAllowRefreshChangeFlag(cacheConfig, config); - if (isAllowRefresh) { - cacheConfig.updateGrayscaleConfig(config); - SubscriptionDataUtils.updateChangeFlag(); - } - } - private static void buildGrayTagsSet(MqGrayscaleConfig config) { for (GrayTagItem item : config.getGrayscale()) { if (!item.getTrafficTag().isEmpty()) { @@ -179,6 +152,33 @@ public static boolean isPluginEnabled() { return cacheConfig.isEnabled(); } + /** + * create gray configs from config center + */ + public static void createMqGrayConfigs() { + if (MqGrayConfigCache.getCacheConfig() == null) { + return; + } + cacheConfig = MqGrayConfigCache.getCacheConfig(); + RocketMqSubscriptionDataUtils.updateChangeFlag(); + buildGrayTagsSet(MqGrayConfigCache.getCacheConfig()); + } + + /** + * check config is changed and update gray configs + */ + public static void checkAndUpdateCacheGrayConfig() { + if (MqGrayConfigCache.getCacheConfig() == null) { + return; + } + boolean isAllowRefresh = isAllowRefreshChangeFlag(cacheConfig, MqGrayConfigCache.getCacheConfig()); + if (isAllowRefresh) { + cacheConfig.updateGrayscaleConfig(MqGrayConfigCache.getCacheConfig()); + RocketMqSubscriptionDataUtils.updateChangeFlag(); + buildGrayTagsSet(MqGrayConfigCache.getCacheConfig()); + } + } + /** * compare serviceMeta with mqGrayscaleConfig, set message property * @@ -213,10 +213,10 @@ public static MqGrayscaleConfig getGrayscaleConfigs() { */ public static List getGrayTagItemByExcludeGroupTags() { MqGrayscaleConfig mqGrayscaleConfig = getGrayscaleConfigs(); + List result = new ArrayList<>(); if (mqGrayscaleConfig.getBase() == null || mqGrayscaleConfig.getBase().getExcludeGroupTags().isEmpty()) { - return Collections.emptyList(); + return result; } - List result = new ArrayList<>(); for (String excludeGroupTag : mqGrayscaleConfig.getBase().getExcludeGroupTags()) { if (mqGrayscaleConfig.getGrayTagByGroupTag(excludeGroupTag).isPresent()) { result.add(mqGrayscaleConfig.getGrayTagByGroupTag(excludeGroupTag).get()); diff --git a/sermant-plugins/sermant-mq-grayscale/sermant-mq-grayscale-plugin/src/main/java/io/sermant/mq/grayscale/utils/SubscriptionDataUtils.java b/sermant-plugins/sermant-mq-grayscale/mq-grayscale-rocketmq-plugin/src/main/java/io/sermant/mq/grayscale/rocketmq/utils/RocketMqSubscriptionDataUtils.java similarity index 79% rename from sermant-plugins/sermant-mq-grayscale/sermant-mq-grayscale-plugin/src/main/java/io/sermant/mq/grayscale/utils/SubscriptionDataUtils.java rename to sermant-plugins/sermant-mq-grayscale/mq-grayscale-rocketmq-plugin/src/main/java/io/sermant/mq/grayscale/rocketmq/utils/RocketMqSubscriptionDataUtils.java index 172f68c560..72085cf82d 100644 --- a/sermant-plugins/sermant-mq-grayscale/sermant-mq-grayscale-plugin/src/main/java/io/sermant/mq/grayscale/utils/SubscriptionDataUtils.java +++ b/sermant-plugins/sermant-mq-grayscale/mq-grayscale-rocketmq-plugin/src/main/java/io/sermant/mq/grayscale/rocketmq/utils/RocketMqSubscriptionDataUtils.java @@ -14,12 +14,12 @@ * limitations under the License. */ -package io.sermant.mq.grayscale.utils; +package io.sermant.mq.grayscale.rocketmq.utils; import io.sermant.core.common.LoggerFactory; import io.sermant.mq.grayscale.config.ConsumeModeEnum; import io.sermant.mq.grayscale.config.GrayTagItem; -import io.sermant.mq.grayscale.config.MqConsumerClientConfig; +import io.sermant.mq.grayscale.rocketmq.config.RocketMqConsumerClientConfig; import io.sermant.mq.grayscale.config.MqGrayscaleConfig; import org.apache.commons.lang3.StringUtils; @@ -44,7 +44,7 @@ * @author chengyouling * @since 2024-06-03 */ -public class SubscriptionDataUtils { +public class RocketMqSubscriptionDataUtils { /** * tag consume message type */ @@ -98,7 +98,7 @@ public class SubscriptionDataUtils { private static final String AND_SPLICE_STR = " and "; - private SubscriptionDataUtils() { + private RocketMqSubscriptionDataUtils() { } /** @@ -130,15 +130,15 @@ private static String getStrForSets(Set tags) { * add current gray tags to sql92 expression * * @param originSubData originSubData - * @param addrTopicGroupKey addrTopicGroupKey + * @param subscribeScope subscribeScope * @return sql expression */ - public static String addGrayTagsToSql92Expression(String originSubData, String addrTopicGroupKey) { + public static String addGrayTagsToSql92Expression(String originSubData, String subscribeScope) { String originSubDataBak = originSubData; if (!StringUtils.isBlank(originSubDataBak)) { originSubDataBak = rebuildWithoutGrayTagSubData(originSubDataBak); } - String sql92Expression = buildSql92Expression(addrTopicGroupKey); + String sql92Expression = buildSql92Expression(subscribeScope); if (StringUtils.isBlank(sql92Expression)) { return originSubDataBak; } @@ -146,12 +146,12 @@ public static String addGrayTagsToSql92Expression(String originSubData, String a ? sql92Expression : originSubDataBak + AND_SPLICE_STR + sql92Expression; } - private static String buildSql92Expression(String addrTopicGroupKey) { + private static String buildSql92Expression(String subscribeScope) { StringBuilder sb = new StringBuilder(); - if (StringUtils.isEmpty(MqGrayscaleConfigUtils.getGrayGroupTag())) { + if (StringUtils.isEmpty(RocketMqGrayscaleConfigUtils.getGrayGroupTag())) { // base model return without exclude group message - if (MqGrayscaleConfigUtils.getConsumeType() == ConsumeModeEnum.BASE) { - List items = MqGrayscaleConfigUtils.getGrayTagItemByExcludeGroupTags(); + if (RocketMqGrayscaleConfigUtils.getConsumeType() == ConsumeModeEnum.BASE) { + List items = RocketMqGrayscaleConfigUtils.getGrayTagItemByExcludeGroupTags(); if (!items.isEmpty()) { sb.append(buildBaseConsumerSql92Expression(items)); } @@ -159,24 +159,24 @@ private static String buildSql92Expression(String addrTopicGroupKey) { } // auto model return without exclude group and current consume message gray group message - sb.append(buildBaseConsumerSql92Expression(getAutoTypeGrayTagItems(addrTopicGroupKey))); + sb.append(buildBaseConsumerSql92Expression(getAutoTypeGrayTagItems(subscribeScope))); } else { - MqGrayscaleConfig mqGrayscaleConfig = MqGrayscaleConfigUtils.getGrayscaleConfigs(); + MqGrayscaleConfig mqGrayscaleConfig = RocketMqGrayscaleConfigUtils.getGrayscaleConfigs(); Optional grayTagItem - = mqGrayscaleConfig.getGrayTagByGroupTag(MqGrayscaleConfigUtils.getGrayGroupTag()); + = mqGrayscaleConfig.getGrayTagByGroupTag(RocketMqGrayscaleConfigUtils.getGrayGroupTag()); if (grayTagItem.isPresent()) { sb.append(buildGrayConsumerSql92Expression(grayTagItem.get())); } else { LOGGER.warning(String.format(Locale.ENGLISH, "current gray group [%s] had not set grayscale, set it " - + "and restart service to valid.", MqGrayscaleConfigUtils.getGrayGroupTag())); + + "and restart service to valid.", RocketMqGrayscaleConfigUtils.getGrayGroupTag())); } } return sb.toString(); } - private static List getAutoTypeGrayTagItems(String addrTopicGroupKey) { - List excludeItems = MqGrayscaleConfigUtils.getGrayTagItemByExcludeGroupTags(); - List autoDiscoveryGrayTags = AUTO_CHECK_GRAY_TAGS.get(addrTopicGroupKey); + private static List getAutoTypeGrayTagItems(String subscribeScope) { + List excludeItems = RocketMqGrayscaleConfigUtils.getGrayTagItemByExcludeGroupTags(); + List autoDiscoveryGrayTags = AUTO_CHECK_GRAY_TAGS.get(subscribeScope); if (autoDiscoveryGrayTags != null && !autoDiscoveryGrayTags.isEmpty()) { excludeItems.addAll(autoDiscoveryGrayTags); } @@ -272,7 +272,7 @@ private static String rebuildWithoutGrayTagSubData(String originSubData) { } private static boolean containsGrayTags(String condition) { - for (String key : MqGrayscaleConfigUtils.getGrayTagsSet()) { + for (String key : RocketMqGrayscaleConfigUtils.getGrayTagsSet()) { if (condition.contains(key)) { return true; } @@ -284,16 +284,16 @@ private static boolean containsGrayTags(String condition) { * reset subString with gray tags * * @param subscriptionData subscriptionData - * @param addrTopicGroupKey addrTopicGroupKey + * @param subscribeScope subscribeScope */ - public static void resetsSql92SubscriptionData(SubscriptionData subscriptionData, String addrTopicGroupKey) { + public static void resetsSql92SubscriptionData(SubscriptionData subscriptionData, String subscribeScope) { String originSubData; if (EXPRESSION_TYPE_TAG.equals(subscriptionData.getExpressionType())) { originSubData = buildSql92ExpressionByTags(subscriptionData.getTagsSet()); } else { originSubData = subscriptionData.getSubString(); } - String newSubStr = addGrayTagsToSql92Expression(originSubData, addrTopicGroupKey); + String newSubStr = addGrayTagsToSql92Expression(originSubData, subscribeScope); if (StringUtils.isEmpty(newSubStr)) { newSubStr = SELECT_ALL_MESSAGE_SQL; } @@ -305,7 +305,7 @@ public static void resetsSql92SubscriptionData(SubscriptionData subscriptionData subscriptionData.setSubString(newSubStr); subscriptionData.setSubVersion(System.currentTimeMillis()); LOGGER.warning(String.format(Locale.ENGLISH, "update [key: %s] SQL92 subscriptionData, originSubStr: " - + "[%s], newSubStr: [%s]", addrTopicGroupKey, originSubData, newSubStr)); + + "[%s], newSubStr: [%s]", subscribeScope, originSubData, newSubStr)); } /** @@ -314,14 +314,15 @@ public static void resetsSql92SubscriptionData(SubscriptionData subscriptionData * @param grayTagItems grayTagItems * @param clientConfig clientConfig */ - public static void resetAutoCheckGrayTagItems(List grayTagItems, MqConsumerClientConfig clientConfig) { - String addrTopicGroupKey = buildAddrTopicGroupKey(clientConfig.getTopic(), clientConfig.getConsumerGroup(), + public static void resetAutoCheckGrayTagItems(List grayTagItems, + RocketMqConsumerClientConfig clientConfig) { + String subscribeScope = buildSubscribeScope(clientConfig.getTopic(), clientConfig.getConsumerGroup(), clientConfig.getAddress()); - AUTO_CHECK_GRAY_TAGS.remove(addrTopicGroupKey); + AUTO_CHECK_GRAY_TAGS.remove(subscribeScope); setAutoCheckTagChangeMap(clientConfig.getAddress(), clientConfig.getTopic(), clientConfig.getConsumerGroup(), true); if (!grayTagItems.isEmpty()) { - AUTO_CHECK_GRAY_TAGS.put(addrTopicGroupKey, grayTagItems); + AUTO_CHECK_GRAY_TAGS.put(subscribeScope, grayTagItems); } } @@ -334,8 +335,8 @@ public static void resetAutoCheckGrayTagItems(List grayTagItems, Mq * @param flag flag */ public static void setAutoCheckTagChangeMap(String namesrvAddr, String topic, String group, boolean flag) { - String addrTopicGroupKey = buildAddrTopicGroupKey(topic, group, namesrvAddr); - BASE_SUBSCRIPT_GRAY_TAG_CHANGE_MAP.put(addrTopicGroupKey, flag); + String subscribeScope = buildSubscribeScope(topic, group, namesrvAddr); + BASE_SUBSCRIPT_GRAY_TAG_CHANGE_MAP.put(subscribeScope, flag); } /** @@ -347,19 +348,19 @@ public static void setAutoCheckTagChangeMap(String namesrvAddr, String topic, St * @param flag flag */ public static void setGrayGroupTagChangeMap(String namesrvAddr, String topic, String group, boolean flag) { - String addrTopicGroupKey = buildAddrTopicGroupKey(topic, group, namesrvAddr); - GRAY_GROUP_TAG_CHANGE_MAP.put(addrTopicGroupKey, flag); + String subscribeScope = buildSubscribeScope(topic, group, namesrvAddr); + GRAY_GROUP_TAG_CHANGE_MAP.put(subscribeScope, flag); } /** - * using namesrvAddr/topic/consumerGroup build key + * using namesrvAddr/topic/consumerGroup build subscribe scope * * @param topic topic * @param consumerGroup consumerGroup * @param namesrvAddr namesrvAddr - * @return namesrvAddr@topic@consumerGroup + * @return subscribeScope */ - public static String buildAddrTopicGroupKey(String topic, String consumerGroup, String namesrvAddr) { + public static String buildSubscribeScope(String topic, String consumerGroup, String namesrvAddr) { String topicTemp = topic.contains(RETYPE) ? StringUtils.substringAfterLast(topic, RETYPE) : topic; String consumerGroupTemp = consumerGroup.contains(RETYPE) ? StringUtils.substringAfterLast(consumerGroup, RETYPE) : consumerGroup; @@ -374,14 +375,14 @@ public static String buildAddrTopicGroupKey(String topic, String consumerGroup, * @return changeFlag */ public static boolean getGrayTagChangeFlag(String topic, RebalanceImpl rebalance) { - String addrTopicGroupKey = buildAddrTopicGroupKey(topic, rebalance.getConsumerGroup(), + String subscribeScope = buildSubscribeScope(topic, rebalance.getConsumerGroup(), rebalance.getmQClientFactory().getClientConfig().getNamesrvAddr()); - if (StringUtils.isEmpty(MqGrayscaleConfigUtils.getGrayGroupTag())) { - return BASE_SUBSCRIPT_GRAY_TAG_CHANGE_MAP.get(addrTopicGroupKey) != null - && BASE_SUBSCRIPT_GRAY_TAG_CHANGE_MAP.get(addrTopicGroupKey); + if (StringUtils.isEmpty(RocketMqGrayscaleConfigUtils.getGrayGroupTag())) { + return BASE_SUBSCRIPT_GRAY_TAG_CHANGE_MAP.get(subscribeScope) != null + && BASE_SUBSCRIPT_GRAY_TAG_CHANGE_MAP.get(subscribeScope); } - return GRAY_GROUP_TAG_CHANGE_MAP.get(addrTopicGroupKey) != null - && GRAY_GROUP_TAG_CHANGE_MAP.get(addrTopicGroupKey); + return GRAY_GROUP_TAG_CHANGE_MAP.get(subscribeScope) != null + && GRAY_GROUP_TAG_CHANGE_MAP.get(subscribeScope); } /** @@ -401,10 +402,24 @@ public static void updateChangeFlag() { * @param flag flag */ public static void resetTagChangeMap(String namesrvAddr, String topic, String consumerGroup, boolean flag) { - if (StringUtils.isEmpty(MqGrayscaleConfigUtils.getGrayGroupTag())) { + if (StringUtils.isEmpty(RocketMqGrayscaleConfigUtils.getGrayGroupTag())) { setAutoCheckTagChangeMap(namesrvAddr, topic, consumerGroup, flag); } else { setGrayGroupTagChangeMap(namesrvAddr, topic, consumerGroup, flag); } } + + /** + * check expressionType is inaccurate + * + * @param expressionType expressionType + * @return is inaccurate + */ + public static boolean isExpressionTypeInaccurate(String expressionType) { + if (!EXPRESSION_TYPE_SQL92.equals(expressionType) && !EXPRESSION_TYPE_TAG.equals(expressionType)) { + LOGGER.warning(String.format(Locale.ENGLISH, "can not process expressionType: %s", expressionType)); + return true; + } + return false; + } } diff --git a/sermant-plugins/sermant-mq-grayscale/mq-grayscale-rocketmq-plugin/src/main/resources/META-INF/services/io.sermant.core.plugin.agent.declarer.PluginDeclarer b/sermant-plugins/sermant-mq-grayscale/mq-grayscale-rocketmq-plugin/src/main/resources/META-INF/services/io.sermant.core.plugin.agent.declarer.PluginDeclarer new file mode 100644 index 0000000000..23a49a7aff --- /dev/null +++ b/sermant-plugins/sermant-mq-grayscale/mq-grayscale-rocketmq-plugin/src/main/resources/META-INF/services/io.sermant.core.plugin.agent.declarer.PluginDeclarer @@ -0,0 +1,25 @@ +# +# Copyright (C) 2024-2024 Sermant Authors. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +io.sermant.mq.grayscale.rocketmq.declarer.RocketMqProducerGrayMessageHookDeclarer +io.sermant.mq.grayscale.rocketmq.declarer.RocketMqPullConsumerSubscriptionUpdateDeclarer +io.sermant.mq.grayscale.rocketmq.declarer.RocketMqSchedulerRebuildSubscriptionDeclarer +io.sermant.mq.grayscale.rocketmq.declarer.RocketMqLitePullConsumerSubscribeDeclarer +io.sermant.mq.grayscale.rocketmq.declarer.RocketMqPullConsumerFetchDeclarer +io.sermant.mq.grayscale.rocketmq.declarer.RocketMqPushConsumerSubscribeFetchDeclarer +io.sermant.mq.grayscale.rocketmq.declarer.RocketMqLitePullConsumerConstructorDeclarer +io.sermant.mq.grayscale.rocketmq.declarer.RocketMqPullConsumerConstructorDeclarer +io.sermant.mq.grayscale.rocketmq.declarer.RocketMqPushConsumerConstructorDeclarer \ No newline at end of file diff --git a/sermant-plugins/sermant-mq-grayscale/pom.xml b/sermant-plugins/sermant-mq-grayscale/pom.xml index 92b4865d42..503066a637 100644 --- a/sermant-plugins/sermant-mq-grayscale/pom.xml +++ b/sermant-plugins/sermant-mq-grayscale/pom.xml @@ -41,19 +41,25 @@ true - sermant-mq-grayscale-plugin + mq-grayscale-rocketmq-plugin + mq-config-common + mq-config-service test - sermant-mq-grayscale-plugin + mq-grayscale-rocketmq-plugin + mq-config-common + mq-config-service release - sermant-mq-grayscale-plugin + mq-grayscale-rocketmq-plugin + mq-config-common + mq-config-service