diff --git a/sermant-plugins/sermant-mq-grayscale/mq-config-common/pom.xml b/sermant-plugins/sermant-mq-grayscale/mq-config-common/pom.xml new file mode 100644 index 0000000000..93be013be1 --- /dev/null +++ b/sermant-plugins/sermant-mq-grayscale/mq-config-common/pom.xml @@ -0,0 +1,36 @@ + + + 4.0.0 + + sermant-mq-grayscale + io.sermant + 1.0.0 + + + mq-config-common + + + 8 + 8 + + + + + io.sermant + sermant-agentcore-core + provided + + + junit + junit + test + + + org.mockito + mockito-core + test + + + \ No newline at end of file diff --git a/sermant-plugins/sermant-mq-grayscale/sermant-mq-grayscale-plugin/src/main/java/io/sermant/mq/grayscale/config/BaseMessage.java b/sermant-plugins/sermant-mq-grayscale/mq-config-common/src/main/java/io/sermant/mq/grayscale/config/BaseMessage.java similarity index 100% rename from sermant-plugins/sermant-mq-grayscale/sermant-mq-grayscale-plugin/src/main/java/io/sermant/mq/grayscale/config/BaseMessage.java rename to sermant-plugins/sermant-mq-grayscale/mq-config-common/src/main/java/io/sermant/mq/grayscale/config/BaseMessage.java diff --git a/sermant-plugins/sermant-mq-grayscale/sermant-mq-grayscale-plugin/src/main/java/io/sermant/mq/grayscale/config/ConsumeModeEnum.java b/sermant-plugins/sermant-mq-grayscale/mq-config-common/src/main/java/io/sermant/mq/grayscale/config/ConsumeModeEnum.java similarity index 100% rename from sermant-plugins/sermant-mq-grayscale/sermant-mq-grayscale-plugin/src/main/java/io/sermant/mq/grayscale/config/ConsumeModeEnum.java rename to sermant-plugins/sermant-mq-grayscale/mq-config-common/src/main/java/io/sermant/mq/grayscale/config/ConsumeModeEnum.java diff --git a/sermant-plugins/sermant-mq-grayscale/sermant-mq-grayscale-plugin/src/main/java/io/sermant/mq/grayscale/config/GrayTagItem.java b/sermant-plugins/sermant-mq-grayscale/mq-config-common/src/main/java/io/sermant/mq/grayscale/config/GrayTagItem.java similarity index 100% rename from sermant-plugins/sermant-mq-grayscale/sermant-mq-grayscale-plugin/src/main/java/io/sermant/mq/grayscale/config/GrayTagItem.java rename to sermant-plugins/sermant-mq-grayscale/mq-config-common/src/main/java/io/sermant/mq/grayscale/config/GrayTagItem.java diff --git a/sermant-plugins/sermant-mq-grayscale/mq-config-common/src/main/java/io/sermant/mq/grayscale/config/MqGrayConfigCache.java b/sermant-plugins/sermant-mq-grayscale/mq-config-common/src/main/java/io/sermant/mq/grayscale/config/MqGrayConfigCache.java new file mode 100644 index 0000000000..c63653507d --- /dev/null +++ b/sermant-plugins/sermant-mq-grayscale/mq-config-common/src/main/java/io/sermant/mq/grayscale/config/MqGrayConfigCache.java @@ -0,0 +1,93 @@ +/* + * Copyright (C) 2024-2024 Sermant Authors. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.sermant.mq.grayscale.config; + +import io.sermant.core.plugin.config.PluginConfigManager; +import io.sermant.core.service.dynamicconfig.common.DynamicConfigEventType; +import io.sermant.mq.grayscale.config.rocketmq.RocketMqConfigUtils; + +/** + * grayscale config cache + * + * @author chengyouling + * @since 2024-09-12 + **/ +public class MqGrayConfigCache { + private static MqGrayscaleConfig cacheConfig = PluginConfigManager.getPluginConfig(MqGrayscaleConfig.class);; + + private MqGrayConfigCache() { + } + + /** + * get cache mqGrayscaleConfig + * + * @return mqGrayscaleConfig + */ + public static MqGrayscaleConfig getCacheConfig() { + return cacheConfig; + } + + /** + * set cache mqGrayscaleConfig + * + * @param config mqGrayscaleConfig + */ + public static void setCacheConfig(MqGrayscaleConfig config, DynamicConfigEventType eventType) { + if (eventType == DynamicConfigEventType.CREATE) { + cacheConfig = config; + RocketMqConfigUtils.updateChangeFlag(); + RocketMqConfigUtils.recordTrafficTagsSet(config); + return; + } + boolean isAllowRefresh = isAllowRefreshChangeFlag(cacheConfig, config); + if (isAllowRefresh) { + cacheConfig.updateGrayscaleConfig(MqGrayConfigCache.getCacheConfig()); + RocketMqConfigUtils.updateChangeFlag(); + RocketMqConfigUtils.recordTrafficTagsSet(config); + } + } + + /** + * clear cache mqGrayscaleConfig + */ + public static void clearCacheConfig() { + cacheConfig = new MqGrayscaleConfig(); + RocketMqConfigUtils.updateChangeFlag(); + } + + /** + * only traffic label changes allow refresh tag change map to rebuild SQL92 query statement, + * because if the serviceMeta changed, the gray consumer cannot be matched and becomes a base consumer + * so, if you need to change the env tag, restart all services. + * + * @param resource cache config + * @param target cache config + * @return boolean + */ + private static boolean isAllowRefreshChangeFlag(MqGrayscaleConfig resource, MqGrayscaleConfig target) { + if (resource.isEnabled() != target.isEnabled()) { + return true; + } + if (resource.isBaseExcludeGroupTagsChanged(target)) { + return true; + } + if (resource.isConsumerModeChanged(target)) { + return true; + } + return !resource.buildAllTrafficTagInfoToStr().equals(target.buildAllTrafficTagInfoToStr()); + } +} diff --git a/sermant-plugins/sermant-mq-grayscale/sermant-mq-grayscale-plugin/src/main/java/io/sermant/mq/grayscale/config/MqGrayscaleConfig.java b/sermant-plugins/sermant-mq-grayscale/mq-config-common/src/main/java/io/sermant/mq/grayscale/config/MqGrayscaleConfig.java similarity index 96% rename from sermant-plugins/sermant-mq-grayscale/sermant-mq-grayscale-plugin/src/main/java/io/sermant/mq/grayscale/config/MqGrayscaleConfig.java rename to sermant-plugins/sermant-mq-grayscale/mq-config-common/src/main/java/io/sermant/mq/grayscale/config/MqGrayscaleConfig.java index 0eb33642ed..f2bd3dba00 100644 --- a/sermant-plugins/sermant-mq-grayscale/sermant-mq-grayscale-plugin/src/main/java/io/sermant/mq/grayscale/config/MqGrayscaleConfig.java +++ b/sermant-plugins/sermant-mq-grayscale/mq-config-common/src/main/java/io/sermant/mq/grayscale/config/MqGrayscaleConfig.java @@ -18,7 +18,6 @@ import io.sermant.core.config.common.ConfigTypeKey; import io.sermant.core.plugin.config.PluginConfig; -import io.sermant.mq.grayscale.utils.SubscriptionDataUtils; import java.util.ArrayList; import java.util.HashMap; @@ -35,6 +34,11 @@ **/ @ConfigTypeKey("grayscale.mq.config") public class MqGrayscaleConfig implements PluginConfig { + /** + * afa symbol + */ + private static final String AFA_SYMBOL = "@"; + private boolean enabled = false; private List grayscale = new ArrayList<>(); @@ -122,12 +126,12 @@ public String buildAllTrafficTagInfoToStr() { StringBuilder sb = new StringBuilder(); for (GrayTagItem item : grayscale) { if (sb.length() > 0) { - sb.append(SubscriptionDataUtils.AFA_SYMBOL); + sb.append(AFA_SYMBOL); } sb.append(item.getConsumerGroupTag()); for (Map.Entry entry : item.getTrafficTag().entrySet()) { sb.append(entry.getKey()) - .append(SubscriptionDataUtils.AFA_SYMBOL) + .append(AFA_SYMBOL) .append(entry.getValue()); } } diff --git a/sermant-plugins/sermant-mq-grayscale/mq-config-common/src/main/java/io/sermant/mq/grayscale/config/rocketmq/RocketMqConfigUtils.java b/sermant-plugins/sermant-mq-grayscale/mq-config-common/src/main/java/io/sermant/mq/grayscale/config/rocketmq/RocketMqConfigUtils.java new file mode 100644 index 0000000000..727dd88f1b --- /dev/null +++ b/sermant-plugins/sermant-mq-grayscale/mq-config-common/src/main/java/io/sermant/mq/grayscale/config/rocketmq/RocketMqConfigUtils.java @@ -0,0 +1,122 @@ +/* + * Copyright (C) 2024-2024 Sermant Authors. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.sermant.mq.grayscale.config.rocketmq; + +import io.sermant.mq.grayscale.config.GrayTagItem; +import io.sermant.mq.grayscale.config.MqGrayscaleConfig; + +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; + +/** + * rocketmq config utils + * + * @author chengyouling + * @since 2024-09-13 + */ +public class RocketMqConfigUtils { + /** + * base instance subscript gray tag change flags + * key: namesrvAddr@topic@consumerGroup + * value: change flag + */ + private static final Map BASE_GROUP_TAG_CHANGE_MAP = new ConcurrentHashMap<>(); + + /** + * gray instance subscript gray tag change flags + * key: namesrvAddr@topic@consumerGroup + * value: change flag + */ + private static final Map GRAY_GROUP_TAG_CHANGE_MAP = new ConcurrentHashMap<>(); + + /** + * all traffic tags that's been set, using for sql92 expression reset + */ + private static final Set GRAY_TAGS_SET = new HashSet<>(); + + /** + * set base consumer address@topic@group correspondents change flag + * + * @param subscribeScope subscribeScope + * @param flag flag + */ + public static void setBaseGroupTagChangeMap(String subscribeScope, boolean flag) { + BASE_GROUP_TAG_CHANGE_MAP.put(subscribeScope, flag); + } + + /** + * set gray consumer address@topic@group correspondents change flag + * + * @param subscribeScope subscribeScope + * @param flag flag + */ + public static void setGrayGroupTagChangeMap(String subscribeScope, boolean flag) { + GRAY_GROUP_TAG_CHANGE_MAP.put(subscribeScope, flag); + } + + /** + * get base consumer address@topic@group correspondents change flag + * + * @param subscribeScope subscribeScope + * @return changeFlag + */ + public static boolean getBaseGroupTagChangeMap(String subscribeScope) { + return BASE_GROUP_TAG_CHANGE_MAP.get(subscribeScope) != null && BASE_GROUP_TAG_CHANGE_MAP.get(subscribeScope); + } + + /** + * get gray consumer address@topic@group correspondents change flag + * + * @param subscribeScope subscribeScope + * @return changeFlag + */ + public static boolean getGrayGroupTagChangeMap(String subscribeScope) { + return GRAY_GROUP_TAG_CHANGE_MAP.get(subscribeScope) != null && GRAY_GROUP_TAG_CHANGE_MAP.get(subscribeScope); + } + + /** + * update all consumer gray tag change flag + */ + public static void updateChangeFlag() { + BASE_GROUP_TAG_CHANGE_MAP.replaceAll((k, v) -> true); + GRAY_GROUP_TAG_CHANGE_MAP.replaceAll((k, v) -> true); + } + + /** + * records traffic labels for historical and current configuration settings + * + * @param config config + */ + public static void recordTrafficTagsSet(MqGrayscaleConfig config) { + for (GrayTagItem item : config.getGrayscale()) { + if (!item.getTrafficTag().isEmpty()) { + GRAY_TAGS_SET.addAll(item.getTrafficTag().keySet()); + } + } + } + + /** + * get all config set gray tags + * + * @return all config set gray tags + */ + public static Set getGrayTagsSet() { + return GRAY_TAGS_SET; + } +} diff --git a/sermant-plugins/sermant-mq-grayscale/sermant-mq-grayscale-plugin/src/main/resources/META-INF/services/io.sermant.core.plugin.config.PluginConfig b/sermant-plugins/sermant-mq-grayscale/mq-config-common/src/main/resources/META-INF/services/io.sermant.core.plugin.config.PluginConfig similarity index 100% rename from sermant-plugins/sermant-mq-grayscale/sermant-mq-grayscale-plugin/src/main/resources/META-INF/services/io.sermant.core.plugin.config.PluginConfig rename to sermant-plugins/sermant-mq-grayscale/mq-config-common/src/main/resources/META-INF/services/io.sermant.core.plugin.config.PluginConfig diff --git a/sermant-plugins/sermant-mq-grayscale/mq-config-service/pom.xml b/sermant-plugins/sermant-mq-grayscale/mq-config-service/pom.xml new file mode 100644 index 0000000000..716fee9eef --- /dev/null +++ b/sermant-plugins/sermant-mq-grayscale/mq-config-service/pom.xml @@ -0,0 +1,47 @@ + + + 4.0.0 + + sermant-mq-grayscale + io.sermant + 1.0.0 + + + mq-config-service + + + 8 + 8 + service + + + + + org.yaml + snakeyaml + + + io.sermant + sermant-agentcore-core + provided + + + io.sermant + mq-config-common + ${project.version} + provided + + + junit + junit + test + + + org.mockito + mockito-core + test + + + \ No newline at end of file diff --git a/sermant-plugins/sermant-mq-grayscale/sermant-mq-grayscale-plugin/src/main/java/io/sermant/mq/grayscale/config/MqGrayConfigHandler.java b/sermant-plugins/sermant-mq-grayscale/mq-config-service/src/main/java/io/sermant/mq/grayscale/listener/MqGrayConfigHandler.java similarity index 89% rename from sermant-plugins/sermant-mq-grayscale/sermant-mq-grayscale-plugin/src/main/java/io/sermant/mq/grayscale/config/MqGrayConfigHandler.java rename to sermant-plugins/sermant-mq-grayscale/mq-config-service/src/main/java/io/sermant/mq/grayscale/listener/MqGrayConfigHandler.java index 413ec90b59..90cc799b03 100644 --- a/sermant-plugins/sermant-mq-grayscale/sermant-mq-grayscale-plugin/src/main/java/io/sermant/mq/grayscale/config/MqGrayConfigHandler.java +++ b/sermant-plugins/sermant-mq-grayscale/mq-config-service/src/main/java/io/sermant/mq/grayscale/listener/MqGrayConfigHandler.java @@ -14,13 +14,14 @@ * limitations under the License. */ -package io.sermant.mq.grayscale.config; +package io.sermant.mq.grayscale.listener; import io.sermant.core.common.LoggerFactory; import io.sermant.core.service.dynamicconfig.common.DynamicConfigEvent; import io.sermant.core.service.dynamicconfig.common.DynamicConfigEventType; import io.sermant.core.utils.StringUtils; -import io.sermant.mq.grayscale.utils.MqGrayscaleConfigUtils; +import io.sermant.mq.grayscale.config.MqGrayConfigCache; +import io.sermant.mq.grayscale.config.MqGrayscaleConfig; import org.yaml.snakeyaml.DumperOptions; import org.yaml.snakeyaml.Yaml; @@ -64,7 +65,7 @@ public void handle(DynamicConfigEvent event) { return; } if (event.getEventType() == DynamicConfigEventType.DELETE) { - MqGrayscaleConfigUtils.resetGrayscaleConfig(); + MqGrayConfigCache.clearCacheConfig(); return; } if (!StringUtils.isEmpty(event.getContent())) { @@ -72,7 +73,7 @@ public void handle(DynamicConfigEvent event) { event.getGroup(), event.getContent())); MqGrayscaleConfig config = yaml.loadAs(event.getContent(), MqGrayscaleConfig.class); if (config != null) { - MqGrayscaleConfigUtils.setGrayscaleConfig(config, event.getEventType()); + MqGrayConfigCache.setCacheConfig(config, event.getEventType()); } } } diff --git a/sermant-plugins/sermant-mq-grayscale/sermant-mq-grayscale-plugin/src/main/java/io/sermant/mq/grayscale/config/MqGrayConfigListener.java b/sermant-plugins/sermant-mq-grayscale/mq-config-service/src/main/java/io/sermant/mq/grayscale/listener/MqGrayConfigListener.java similarity index 96% rename from sermant-plugins/sermant-mq-grayscale/sermant-mq-grayscale-plugin/src/main/java/io/sermant/mq/grayscale/config/MqGrayConfigListener.java rename to sermant-plugins/sermant-mq-grayscale/mq-config-service/src/main/java/io/sermant/mq/grayscale/listener/MqGrayConfigListener.java index 3ec70e1b11..0300ddf695 100644 --- a/sermant-plugins/sermant-mq-grayscale/sermant-mq-grayscale-plugin/src/main/java/io/sermant/mq/grayscale/config/MqGrayConfigListener.java +++ b/sermant-plugins/sermant-mq-grayscale/mq-config-service/src/main/java/io/sermant/mq/grayscale/listener/MqGrayConfigListener.java @@ -14,7 +14,7 @@ * limitations under the License. */ -package io.sermant.mq.grayscale.config; +package io.sermant.mq.grayscale.listener; import io.sermant.core.service.dynamicconfig.common.DynamicConfigEvent; import io.sermant.core.service.dynamicconfig.common.DynamicConfigListener; diff --git a/sermant-plugins/sermant-mq-grayscale/mq-config-service/src/main/java/io/sermant/mq/grayscale/service/MqGrayDynamicConfigService.java b/sermant-plugins/sermant-mq-grayscale/mq-config-service/src/main/java/io/sermant/mq/grayscale/service/MqGrayDynamicConfigService.java new file mode 100644 index 0000000000..ed8205777e --- /dev/null +++ b/sermant-plugins/sermant-mq-grayscale/mq-config-service/src/main/java/io/sermant/mq/grayscale/service/MqGrayDynamicConfigService.java @@ -0,0 +1,46 @@ +/* + * Copyright (C) 2024-2024 Sermant Authors. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.sermant.mq.grayscale.service; + +import io.sermant.core.common.LoggerFactory; +import io.sermant.core.config.ConfigManager; +import io.sermant.core.plugin.config.ServiceMeta; +import io.sermant.core.plugin.service.PluginService; +import io.sermant.core.plugin.subscribe.CommonGroupConfigSubscriber; +import io.sermant.core.plugin.subscribe.ConfigSubscriber; +import io.sermant.mq.grayscale.listener.MqGrayConfigListener; + +import java.util.logging.Logger; + +/** + * grayscale dynamic config service + * + * @author chengyouling + * @since 2024-05-27 + **/ +public class MqGrayDynamicConfigService implements PluginService { + private static final Logger LOGGER = LoggerFactory.getLogger(); + + @Override + public void start() { + ConfigSubscriber subscriber = new CommonGroupConfigSubscriber( + ConfigManager.getConfig(ServiceMeta.class).getService(), new MqGrayConfigListener(), + "mq-grayscale"); + subscriber.subscribe(); + LOGGER.info("Success to subscribe mq-grayscale config"); + } +} diff --git a/sermant-plugins/sermant-mq-grayscale/mq-config-service/src/main/resources/META-INF/services/io.sermant.core.plugin.service.PluginService b/sermant-plugins/sermant-mq-grayscale/mq-config-service/src/main/resources/META-INF/services/io.sermant.core.plugin.service.PluginService new file mode 100644 index 0000000000..ef5284eb25 --- /dev/null +++ b/sermant-plugins/sermant-mq-grayscale/mq-config-service/src/main/resources/META-INF/services/io.sermant.core.plugin.service.PluginService @@ -0,0 +1,17 @@ +# +# Copyright (C) 2024-2024 Sermant Authors. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +io.sermant.mq.grayscale.service.MqGrayDynamicConfigService \ No newline at end of file diff --git a/sermant-plugins/sermant-mq-grayscale/sermant-mq-grayscale-plugin/pom.xml b/sermant-plugins/sermant-mq-grayscale/mq-grayscale-rocketmq-plugin/pom.xml similarity index 90% rename from sermant-plugins/sermant-mq-grayscale/sermant-mq-grayscale-plugin/pom.xml rename to sermant-plugins/sermant-mq-grayscale/mq-grayscale-rocketmq-plugin/pom.xml index 8bf04ab08b..b6710a633f 100644 --- a/sermant-plugins/sermant-mq-grayscale/sermant-mq-grayscale-plugin/pom.xml +++ b/sermant-plugins/sermant-mq-grayscale/mq-grayscale-rocketmq-plugin/pom.xml @@ -9,7 +9,7 @@ 1.0.0 - sermant-mq-grayscale-plugin + mq-grayscale-rocketmq-plugin 8 @@ -20,12 +20,9 @@ - com.alibaba - fastjson - - - org.yaml - snakeyaml + io.sermant + mq-config-common + ${project.version} io.sermant @@ -52,6 +49,11 @@ mockito-inline test + + com.alibaba + fastjson + test + diff --git a/sermant-plugins/sermant-mq-grayscale/sermant-mq-grayscale-plugin/src/main/java/io/sermant/mq/grayscale/config/MqConsumerClientConfig.java b/sermant-plugins/sermant-mq-grayscale/mq-grayscale-rocketmq-plugin/src/main/java/io/sermant/mq/grayscale/rocketmq/config/RocketMqConsumerClientConfig.java similarity index 90% rename from sermant-plugins/sermant-mq-grayscale/sermant-mq-grayscale-plugin/src/main/java/io/sermant/mq/grayscale/config/MqConsumerClientConfig.java rename to sermant-plugins/sermant-mq-grayscale/mq-grayscale-rocketmq-plugin/src/main/java/io/sermant/mq/grayscale/rocketmq/config/RocketMqConsumerClientConfig.java index 4079594203..2a39d75ec5 100644 --- a/sermant-plugins/sermant-mq-grayscale/sermant-mq-grayscale-plugin/src/main/java/io/sermant/mq/grayscale/config/MqConsumerClientConfig.java +++ b/sermant-plugins/sermant-mq-grayscale/mq-grayscale-rocketmq-plugin/src/main/java/io/sermant/mq/grayscale/rocketmq/config/RocketMqConsumerClientConfig.java @@ -14,7 +14,7 @@ * limitations under the License. */ -package io.sermant.mq.grayscale.config; +package io.sermant.mq.grayscale.rocketmq.config; import org.apache.rocketmq.client.impl.factory.MQClientInstance; @@ -24,7 +24,7 @@ * @author chengyouling * @since 2024-05-27 **/ -public class MqConsumerClientConfig { +public class RocketMqConsumerClientConfig { private String topic; private String address; @@ -40,7 +40,7 @@ public class MqConsumerClientConfig { * @param topic topic * @param consumerGroup consumerGroup */ - public MqConsumerClientConfig(String address, String topic, String consumerGroup) { + public RocketMqConsumerClientConfig(String address, String topic, String consumerGroup) { this.address = address; this.topic = topic; this.consumerGroup = consumerGroup; diff --git a/sermant-plugins/sermant-mq-grayscale/mq-grayscale-rocketmq-plugin/src/main/java/io/sermant/mq/grayscale/rocketmq/declarer/RocketMqLitePullConsumerConstructorDeclarer.java b/sermant-plugins/sermant-mq-grayscale/mq-grayscale-rocketmq-plugin/src/main/java/io/sermant/mq/grayscale/rocketmq/declarer/RocketMqLitePullConsumerConstructorDeclarer.java new file mode 100644 index 0000000000..1d3c8d6419 --- /dev/null +++ b/sermant-plugins/sermant-mq-grayscale/mq-grayscale-rocketmq-plugin/src/main/java/io/sermant/mq/grayscale/rocketmq/declarer/RocketMqLitePullConsumerConstructorDeclarer.java @@ -0,0 +1,53 @@ +/* + * Copyright (C) 2024-2024 Sermant Authors. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.sermant.mq.grayscale.rocketmq.declarer; + +import io.sermant.core.plugin.agent.declarer.AbstractPluginDeclarer; +import io.sermant.core.plugin.agent.declarer.InterceptDeclarer; +import io.sermant.core.plugin.agent.matcher.ClassMatcher; +import io.sermant.core.plugin.agent.matcher.MethodMatcher; +import io.sermant.mq.grayscale.rocketmq.interceptor.RocketMqConsumerConstructorInterceptor; + +/** + * lite pull consumer set gray consumer group declarer + * + * @author chengyouling + * @since 2024-09-07 + **/ +public class RocketMqLitePullConsumerConstructorDeclarer extends AbstractPluginDeclarer { + private static final String ENHANCE_CLASS = "org.apache.rocketmq.client.consumer.DefaultLitePullConsumer"; + + private static final String[] METHOD_PARAM_TYPES = { + "java.lang.String", + "java.lang.String", + "org.apache.rocketmq.remoting.RPCHook" + }; + + @Override + public ClassMatcher getClassMatcher() { + return ClassMatcher.nameEquals(ENHANCE_CLASS); + } + + @Override + public InterceptDeclarer[] getInterceptDeclarers(ClassLoader classLoader) { + return new InterceptDeclarer[]{ + InterceptDeclarer.build(MethodMatcher.isConstructor() + .and(MethodMatcher.paramTypesEqual(METHOD_PARAM_TYPES)), + new RocketMqConsumerConstructorInterceptor()) + }; + } +} diff --git a/sermant-plugins/sermant-mq-grayscale/mq-grayscale-rocketmq-plugin/src/main/java/io/sermant/mq/grayscale/rocketmq/declarer/RocketMqLitePullConsumerSubscribeDeclarer.java b/sermant-plugins/sermant-mq-grayscale/mq-grayscale-rocketmq-plugin/src/main/java/io/sermant/mq/grayscale/rocketmq/declarer/RocketMqLitePullConsumerSubscribeDeclarer.java new file mode 100644 index 0000000000..60090c820f --- /dev/null +++ b/sermant-plugins/sermant-mq-grayscale/mq-grayscale-rocketmq-plugin/src/main/java/io/sermant/mq/grayscale/rocketmq/declarer/RocketMqLitePullConsumerSubscribeDeclarer.java @@ -0,0 +1,51 @@ +/* + * Copyright (C) 2024-2024 Sermant Authors. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.sermant.mq.grayscale.rocketmq.declarer; + +import io.sermant.core.plugin.agent.declarer.AbstractPluginDeclarer; +import io.sermant.core.plugin.agent.declarer.InterceptDeclarer; +import io.sermant.core.plugin.agent.matcher.ClassMatcher; +import io.sermant.core.plugin.agent.matcher.MethodMatcher; +import io.sermant.mq.grayscale.rocketmq.interceptor.RocketMqLitePullConsumerSubscribeInterceptor; + +/** + * lite pull consumer build consumer client config declarer + * + * @author chengyouling + * @since 2024-05-27 + **/ +public class RocketMqLitePullConsumerSubscribeDeclarer extends AbstractPluginDeclarer { + private static final String ENHANCE_CLASS = "org.apache.rocketmq.client.impl.consumer.DefaultLitePullConsumerImpl"; + + private static final String METHOD_SUBSCRIBE = "subscribe"; + + private static final String METHOD_FETCH = "fetchMessageQueues"; + + @Override + public ClassMatcher getClassMatcher() { + return ClassMatcher.nameEquals(ENHANCE_CLASS); + } + + @Override + public InterceptDeclarer[] getInterceptDeclarers(ClassLoader classLoader) { + return new InterceptDeclarer[]{ + InterceptDeclarer.build( + MethodMatcher.nameContains(METHOD_SUBSCRIBE, METHOD_FETCH), + new RocketMqLitePullConsumerSubscribeInterceptor()) + }; + } +} diff --git a/sermant-plugins/sermant-mq-grayscale/mq-grayscale-rocketmq-plugin/src/main/java/io/sermant/mq/grayscale/rocketmq/declarer/RocketMqProducerGrayMessageHookDeclarer.java b/sermant-plugins/sermant-mq-grayscale/mq-grayscale-rocketmq-plugin/src/main/java/io/sermant/mq/grayscale/rocketmq/declarer/RocketMqProducerGrayMessageHookDeclarer.java new file mode 100644 index 0000000000..166411b68f --- /dev/null +++ b/sermant-plugins/sermant-mq-grayscale/mq-grayscale-rocketmq-plugin/src/main/java/io/sermant/mq/grayscale/rocketmq/declarer/RocketMqProducerGrayMessageHookDeclarer.java @@ -0,0 +1,52 @@ +/* + * Copyright (C) 2024-2024 Sermant Authors. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.sermant.mq.grayscale.rocketmq.declarer; + +import io.sermant.core.plugin.agent.declarer.AbstractPluginDeclarer; +import io.sermant.core.plugin.agent.declarer.InterceptDeclarer; +import io.sermant.core.plugin.agent.matcher.ClassMatcher; +import io.sermant.core.plugin.agent.matcher.MethodMatcher; +import io.sermant.mq.grayscale.rocketmq.interceptor.RocketMqProducerGrayMessageHookInterceptor; + +/** + * SendMessageHook builder declarer + * + * @author chengyouling + * @since 2024-05-27 + **/ +public class RocketMqProducerGrayMessageHookDeclarer extends AbstractPluginDeclarer { + private static final String ENHANCE_CLASS = "org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl"; + + private static final String[] METHOD_PARAM_TYPES = { + "org.apache.rocketmq.client.producer.DefaultMQProducer", + "org.apache.rocketmq.remoting.RPCHook" + }; + + @Override + public ClassMatcher getClassMatcher() { + return ClassMatcher.nameEquals(ENHANCE_CLASS); + } + + @Override + public InterceptDeclarer[] getInterceptDeclarers(ClassLoader classLoader) { + return new InterceptDeclarer[]{ + InterceptDeclarer.build(MethodMatcher.isConstructor() + .and(MethodMatcher.paramTypesEqual(METHOD_PARAM_TYPES)), + new RocketMqProducerGrayMessageHookInterceptor()) + }; + } +} diff --git a/sermant-plugins/sermant-mq-grayscale/mq-grayscale-rocketmq-plugin/src/main/java/io/sermant/mq/grayscale/rocketmq/declarer/RocketMqPullConsumerConstructorDeclarer.java b/sermant-plugins/sermant-mq-grayscale/mq-grayscale-rocketmq-plugin/src/main/java/io/sermant/mq/grayscale/rocketmq/declarer/RocketMqPullConsumerConstructorDeclarer.java new file mode 100644 index 0000000000..9d185c9d15 --- /dev/null +++ b/sermant-plugins/sermant-mq-grayscale/mq-grayscale-rocketmq-plugin/src/main/java/io/sermant/mq/grayscale/rocketmq/declarer/RocketMqPullConsumerConstructorDeclarer.java @@ -0,0 +1,53 @@ +/* + * Copyright (C) 2024-2024 Sermant Authors. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.sermant.mq.grayscale.rocketmq.declarer; + +import io.sermant.core.plugin.agent.declarer.AbstractPluginDeclarer; +import io.sermant.core.plugin.agent.declarer.InterceptDeclarer; +import io.sermant.core.plugin.agent.matcher.ClassMatcher; +import io.sermant.core.plugin.agent.matcher.MethodMatcher; +import io.sermant.mq.grayscale.rocketmq.interceptor.RocketMqConsumerConstructorInterceptor; + +/** + * pull consumer set gray consumer group declarer + * + * @author chengyouling + * @since 2024-09-07 + **/ +public class RocketMqPullConsumerConstructorDeclarer extends AbstractPluginDeclarer { + private static final String ENHANCE_CLASS = "org.apache.rocketmq.client.consumer.DefaultMQPullConsumer"; + + private static final String[] METHOD_PARAM_TYPES = { + "java.lang.String", + "java.lang.String", + "org.apache.rocketmq.remoting.RPCHook" + }; + + @Override + public ClassMatcher getClassMatcher() { + return ClassMatcher.nameEquals(ENHANCE_CLASS); + } + + @Override + public InterceptDeclarer[] getInterceptDeclarers(ClassLoader classLoader) { + return new InterceptDeclarer[]{ + InterceptDeclarer.build(MethodMatcher.isConstructor() + .and(MethodMatcher.paramTypesEqual(METHOD_PARAM_TYPES)), + new RocketMqConsumerConstructorInterceptor()) + }; + } +} diff --git a/sermant-plugins/sermant-mq-grayscale/mq-grayscale-rocketmq-plugin/src/main/java/io/sermant/mq/grayscale/rocketmq/declarer/RocketMqPullConsumerFetchDeclarer.java b/sermant-plugins/sermant-mq-grayscale/mq-grayscale-rocketmq-plugin/src/main/java/io/sermant/mq/grayscale/rocketmq/declarer/RocketMqPullConsumerFetchDeclarer.java new file mode 100644 index 0000000000..9471eafdc7 --- /dev/null +++ b/sermant-plugins/sermant-mq-grayscale/mq-grayscale-rocketmq-plugin/src/main/java/io/sermant/mq/grayscale/rocketmq/declarer/RocketMqPullConsumerFetchDeclarer.java @@ -0,0 +1,50 @@ +/* + * Copyright (C) 2024-2024 Sermant Authors. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.sermant.mq.grayscale.rocketmq.declarer; + +import io.sermant.core.plugin.agent.declarer.AbstractPluginDeclarer; +import io.sermant.core.plugin.agent.declarer.InterceptDeclarer; +import io.sermant.core.plugin.agent.matcher.ClassMatcher; +import io.sermant.core.plugin.agent.matcher.MethodMatcher; +import io.sermant.mq.grayscale.rocketmq.interceptor.RocketMqPullConsumerFetchInterceptor; + +/** + * pull consumer build consumer client config declarer + * + * @author chengyouling + * @since 2024-05-27 + **/ +public class RocketMqPullConsumerFetchDeclarer extends AbstractPluginDeclarer { + private static final String ENHANCE_CLASS = "org.apache.rocketmq.client.impl.consumer.DefaultMQPullConsumerImpl"; + + private static final String METHOD_FETCH_SUBSCRIBE = "fetchSubscribeMessageQueues"; + + private static final String METHOD_FETCH_QUEUE = "fetchMessageQueuesInBalance"; + + @Override + public ClassMatcher getClassMatcher() { + return ClassMatcher.nameEquals(ENHANCE_CLASS); + } + + @Override + public InterceptDeclarer[] getInterceptDeclarers(ClassLoader classLoader) { + return new InterceptDeclarer[]{ + InterceptDeclarer.build(MethodMatcher.nameContains(METHOD_FETCH_QUEUE, METHOD_FETCH_SUBSCRIBE), + new RocketMqPullConsumerFetchInterceptor()) + }; + } +} diff --git a/sermant-plugins/sermant-mq-grayscale/mq-grayscale-rocketmq-plugin/src/main/java/io/sermant/mq/grayscale/rocketmq/declarer/RocketMqPullConsumerSubscriptionUpdateDeclarer.java b/sermant-plugins/sermant-mq-grayscale/mq-grayscale-rocketmq-plugin/src/main/java/io/sermant/mq/grayscale/rocketmq/declarer/RocketMqPullConsumerSubscriptionUpdateDeclarer.java new file mode 100644 index 0000000000..6c22710dfb --- /dev/null +++ b/sermant-plugins/sermant-mq-grayscale/mq-grayscale-rocketmq-plugin/src/main/java/io/sermant/mq/grayscale/rocketmq/declarer/RocketMqPullConsumerSubscriptionUpdateDeclarer.java @@ -0,0 +1,48 @@ +/* + * Copyright (C) 2024-2024 Sermant Authors. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.sermant.mq.grayscale.rocketmq.declarer; + +import io.sermant.core.plugin.agent.declarer.AbstractPluginDeclarer; +import io.sermant.core.plugin.agent.declarer.InterceptDeclarer; +import io.sermant.core.plugin.agent.matcher.ClassMatcher; +import io.sermant.core.plugin.agent.matcher.MethodMatcher; +import io.sermant.mq.grayscale.rocketmq.interceptor.RocketMqPullConsumerSubscriptionUpdateInterceptor; + +/** + * pull consumer TAG/SQL92 query message statement build declarer + * + * @author chengyouling + * @since 2024-05-27 + **/ +public class RocketMqPullConsumerSubscriptionUpdateDeclarer extends AbstractPluginDeclarer { + private static final String ENHANCE_CLASS = "org.apache.rocketmq.client.impl.consumer.DefaultMQPullConsumerImpl"; + + private static final String METHOD_NAME = "getSubscriptionData"; + + @Override + public ClassMatcher getClassMatcher() { + return ClassMatcher.nameEquals(ENHANCE_CLASS); + } + + @Override + public InterceptDeclarer[] getInterceptDeclarers(ClassLoader classLoader) { + return new InterceptDeclarer[]{ + InterceptDeclarer.build(MethodMatcher.nameEquals(METHOD_NAME), + new RocketMqPullConsumerSubscriptionUpdateInterceptor()) + }; + } +} diff --git a/sermant-plugins/sermant-mq-grayscale/mq-grayscale-rocketmq-plugin/src/main/java/io/sermant/mq/grayscale/rocketmq/declarer/RocketMqPushConsumerConstructorDeclarer.java b/sermant-plugins/sermant-mq-grayscale/mq-grayscale-rocketmq-plugin/src/main/java/io/sermant/mq/grayscale/rocketmq/declarer/RocketMqPushConsumerConstructorDeclarer.java new file mode 100644 index 0000000000..9a4e2ea30f --- /dev/null +++ b/sermant-plugins/sermant-mq-grayscale/mq-grayscale-rocketmq-plugin/src/main/java/io/sermant/mq/grayscale/rocketmq/declarer/RocketMqPushConsumerConstructorDeclarer.java @@ -0,0 +1,70 @@ +/* + * Copyright (C) 2024-2024 Sermant Authors. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.sermant.mq.grayscale.rocketmq.declarer; + +import io.sermant.core.plugin.agent.declarer.AbstractPluginDeclarer; +import io.sermant.core.plugin.agent.declarer.InterceptDeclarer; +import io.sermant.core.plugin.agent.matcher.ClassMatcher; +import io.sermant.core.plugin.agent.matcher.MethodMatcher; +import io.sermant.mq.grayscale.rocketmq.interceptor.RocketMqConsumerConstructorInterceptor; + +/** + * push consumer set gray consumer group declarer + * + * @author chengyouling + * @since 2024-09-07 + **/ +public class RocketMqPushConsumerConstructorDeclarer extends AbstractPluginDeclarer { + private static final String ENHANCE_CLASS = "org.apache.rocketmq.client.consumer.DefaultMQPushConsumer"; + + private static final String PARAMETER_STRING = "java.lang.String"; + + private static final String PARAMETER_HOOK = "org.apache.rocketmq.remoting.RPCHook"; + + private static final String PARAMETER_STRATEGY = "org.apache.rocketmq.client.consumer.AllocateMessageQueueStrategy"; + + private static final String[] METHOD_FOUR_PARAM_TYPES = { + PARAMETER_STRING, + PARAMETER_STRING, + PARAMETER_HOOK, + PARAMETER_STRATEGY + }; + + private static final String[] METHOD_SIX_PARAMS = { + PARAMETER_STRING, + PARAMETER_STRING, + PARAMETER_HOOK, + PARAMETER_STRATEGY, + "boolean", + PARAMETER_STRING + }; + + @Override + public ClassMatcher getClassMatcher() { + return ClassMatcher.nameEquals(ENHANCE_CLASS); + } + + @Override + public InterceptDeclarer[] getInterceptDeclarers(ClassLoader classLoader) { + return new InterceptDeclarer[]{ + InterceptDeclarer.build(MethodMatcher.isConstructor() + .and(MethodMatcher.paramTypesEqual(METHOD_FOUR_PARAM_TYPES) + .or(MethodMatcher.paramTypesEqual(METHOD_SIX_PARAMS))), + new RocketMqConsumerConstructorInterceptor()) + }; + } +} diff --git a/sermant-plugins/sermant-mq-grayscale/mq-grayscale-rocketmq-plugin/src/main/java/io/sermant/mq/grayscale/rocketmq/declarer/RocketMqPushConsumerSubscribeFetchDeclarer.java b/sermant-plugins/sermant-mq-grayscale/mq-grayscale-rocketmq-plugin/src/main/java/io/sermant/mq/grayscale/rocketmq/declarer/RocketMqPushConsumerSubscribeFetchDeclarer.java new file mode 100644 index 0000000000..1df882cf7b --- /dev/null +++ b/sermant-plugins/sermant-mq-grayscale/mq-grayscale-rocketmq-plugin/src/main/java/io/sermant/mq/grayscale/rocketmq/declarer/RocketMqPushConsumerSubscribeFetchDeclarer.java @@ -0,0 +1,50 @@ +/* + * Copyright (C) 2024-2024 Sermant Authors. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.sermant.mq.grayscale.rocketmq.declarer; + +import io.sermant.core.plugin.agent.declarer.AbstractPluginDeclarer; +import io.sermant.core.plugin.agent.declarer.InterceptDeclarer; +import io.sermant.core.plugin.agent.matcher.ClassMatcher; +import io.sermant.core.plugin.agent.matcher.MethodMatcher; +import io.sermant.mq.grayscale.rocketmq.interceptor.RocketMqPushConsumerSubscribeFetchInterceptor; + +/** + * push consumer build consumer client config declarer + * + * @author chengyouling + * @since 2024-05-27 + **/ +public class RocketMqPushConsumerSubscribeFetchDeclarer extends AbstractPluginDeclarer { + private static final String ENHANCE_CLASS = "org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl"; + + private static final String METHOD_FETCH_SUBSCRIBE = "fetchSubscribeMessageQueues"; + + private static final String METHOD_SUBSCRIBE = "subscribe"; + + @Override + public ClassMatcher getClassMatcher() { + return ClassMatcher.nameEquals(ENHANCE_CLASS); + } + + @Override + public InterceptDeclarer[] getInterceptDeclarers(ClassLoader classLoader) { + return new InterceptDeclarer[]{ + InterceptDeclarer.build(MethodMatcher.nameContains(METHOD_SUBSCRIBE, METHOD_FETCH_SUBSCRIBE), + new RocketMqPushConsumerSubscribeFetchInterceptor()) + }; + } +} diff --git a/sermant-plugins/sermant-mq-grayscale/mq-grayscale-rocketmq-plugin/src/main/java/io/sermant/mq/grayscale/rocketmq/declarer/RocketMqSchedulerRebuildSubscriptionDeclarer.java b/sermant-plugins/sermant-mq-grayscale/mq-grayscale-rocketmq-plugin/src/main/java/io/sermant/mq/grayscale/rocketmq/declarer/RocketMqSchedulerRebuildSubscriptionDeclarer.java new file mode 100644 index 0000000000..0606c8e3dd --- /dev/null +++ b/sermant-plugins/sermant-mq-grayscale/mq-grayscale-rocketmq-plugin/src/main/java/io/sermant/mq/grayscale/rocketmq/declarer/RocketMqSchedulerRebuildSubscriptionDeclarer.java @@ -0,0 +1,48 @@ +/* + * Copyright (C) 2024-2024 Sermant Authors. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.sermant.mq.grayscale.rocketmq.declarer; + +import io.sermant.core.plugin.agent.declarer.AbstractPluginDeclarer; +import io.sermant.core.plugin.agent.declarer.InterceptDeclarer; +import io.sermant.core.plugin.agent.matcher.ClassMatcher; +import io.sermant.core.plugin.agent.matcher.MethodMatcher; +import io.sermant.mq.grayscale.rocketmq.interceptor.RocketMqSchedulerRebuildSubscriptionInterceptor; + +/** + * scheduler update SQL92 query message statement declarer + * + * @author chengyouling + * @since 2024-05-27 + **/ +public class RocketMqSchedulerRebuildSubscriptionDeclarer extends AbstractPluginDeclarer { + private static final String ENHANCE_CLASS = "org.apache.rocketmq.client.impl.consumer.RebalanceImpl"; + + private static final String METHOD_NAME = "getSubscriptionInner"; + + @Override + public ClassMatcher getClassMatcher() { + return ClassMatcher.nameEquals(ENHANCE_CLASS); + } + + @Override + public InterceptDeclarer[] getInterceptDeclarers(ClassLoader classLoader) { + return new InterceptDeclarer[]{ + InterceptDeclarer.build(MethodMatcher.nameEquals(METHOD_NAME), + new RocketMqSchedulerRebuildSubscriptionInterceptor()) + }; + } +} diff --git a/sermant-plugins/sermant-mq-grayscale/mq-grayscale-rocketmq-plugin/src/main/java/io/sermant/mq/grayscale/rocketmq/interceptor/RocketMqAbstractInterceptor.java b/sermant-plugins/sermant-mq-grayscale/mq-grayscale-rocketmq-plugin/src/main/java/io/sermant/mq/grayscale/rocketmq/interceptor/RocketMqAbstractInterceptor.java new file mode 100644 index 0000000000..70bd388e14 --- /dev/null +++ b/sermant-plugins/sermant-mq-grayscale/mq-grayscale-rocketmq-plugin/src/main/java/io/sermant/mq/grayscale/rocketmq/interceptor/RocketMqAbstractInterceptor.java @@ -0,0 +1,74 @@ +/* + * Copyright (C) 2024-2024 Sermant Authors. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.sermant.mq.grayscale.rocketmq.interceptor; + +import io.sermant.core.plugin.agent.entity.ExecuteContext; +import io.sermant.core.plugin.agent.interceptor.AbstractInterceptor; +import io.sermant.core.utils.StringUtils; +import io.sermant.mq.grayscale.config.MqGrayConfigCache; +import io.sermant.mq.grayscale.config.rocketmq.RocketMqConfigUtils; +import io.sermant.mq.grayscale.rocketmq.service.RocketMqConsumerGroupAutoCheck; +import io.sermant.mq.grayscale.rocketmq.utils.RocketMqGrayscaleConfigUtils; +import io.sermant.mq.grayscale.rocketmq.utils.RocketMqSubscriptionDataUtils; + +/** + * mq consumer abstract interceptor + * + * @author chengyouling + * @since 2024-09-04 + **/ +public abstract class RocketMqAbstractInterceptor extends AbstractInterceptor { + @Override + public ExecuteContext before(ExecuteContext context) throws Exception { + return context; + } + + @Override + public ExecuteContext after(ExecuteContext context) throws Exception { + if (!MqGrayConfigCache.getCacheConfig().isEnabled()) { + return context; + } + return doAfter(context); + } + + /** + * Handling after the intercept point + * + * @param context context + * @return context + * @throws Exception Exception + */ + protected abstract ExecuteContext doAfter(ExecuteContext context) throws Exception; + + /** + * build gray group name and set consumer client configs + * + * @param namesrvAddr namesrvAddr + * @param topic topic + * @param consumerGroup consumerGroup + */ + protected void buildGroupAndClientConfig(String namesrvAddr, String topic, String consumerGroup) { + String grayGroupTag = RocketMqGrayscaleConfigUtils.getGrayGroupTag(); + String subscribeScope = RocketMqSubscriptionDataUtils.buildSubscribeScope(topic, consumerGroup, namesrvAddr); + if (StringUtils.isEmpty(grayGroupTag)) { + RocketMqConsumerGroupAutoCheck.setConsumerClientConfig(namesrvAddr, topic, consumerGroup); + RocketMqConfigUtils.setBaseGroupTagChangeMap(subscribeScope, true); + return; + } + RocketMqConfigUtils.setGrayGroupTagChangeMap(subscribeScope, true); + } +} diff --git a/sermant-plugins/sermant-mq-grayscale/mq-grayscale-rocketmq-plugin/src/main/java/io/sermant/mq/grayscale/rocketmq/interceptor/RocketMqConsumerConstructorInterceptor.java b/sermant-plugins/sermant-mq-grayscale/mq-grayscale-rocketmq-plugin/src/main/java/io/sermant/mq/grayscale/rocketmq/interceptor/RocketMqConsumerConstructorInterceptor.java new file mode 100644 index 0000000000..e9d30452a4 --- /dev/null +++ b/sermant-plugins/sermant-mq-grayscale/mq-grayscale-rocketmq-plugin/src/main/java/io/sermant/mq/grayscale/rocketmq/interceptor/RocketMqConsumerConstructorInterceptor.java @@ -0,0 +1,52 @@ +/* + * Copyright (C) 2024-2024 Sermant Authors. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.sermant.mq.grayscale.rocketmq.interceptor; + +import io.sermant.core.plugin.agent.entity.ExecuteContext; +import io.sermant.core.plugin.agent.interceptor.AbstractInterceptor; +import io.sermant.core.utils.StringUtils; +import io.sermant.mq.grayscale.config.MqGrayConfigCache; +import io.sermant.mq.grayscale.rocketmq.utils.RocketMqGrayscaleConfigUtils; + +/** + * DefaultMQPushConsumer/DefaultLitePullConsumer/DefaultMQPullConsumer Constructor method interceptor + * gray scene reset consumerGroup with grayGroupTag + * + * @author chengyouling + * @since 2024-05-27 + **/ +public class RocketMqConsumerConstructorInterceptor extends AbstractInterceptor { + @Override + public ExecuteContext before(ExecuteContext context) throws Exception { + if (!MqGrayConfigCache.getCacheConfig().isEnabled()) { + return context; + } + String grayGroupTag = RocketMqGrayscaleConfigUtils.getGrayGroupTag(); + if (StringUtils.isEmpty(grayGroupTag)) { + return context; + } + String originGroup = (String) context.getArguments()[1]; + context.getArguments()[1] + = originGroup.contains("_" + grayGroupTag) ? originGroup : originGroup + "_" + grayGroupTag; + return context; + } + + @Override + public ExecuteContext after(ExecuteContext context) throws Exception { + return context; + } +} diff --git a/sermant-plugins/sermant-mq-grayscale/mq-grayscale-rocketmq-plugin/src/main/java/io/sermant/mq/grayscale/rocketmq/interceptor/RocketMqLitePullConsumerSubscribeInterceptor.java b/sermant-plugins/sermant-mq-grayscale/mq-grayscale-rocketmq-plugin/src/main/java/io/sermant/mq/grayscale/rocketmq/interceptor/RocketMqLitePullConsumerSubscribeInterceptor.java new file mode 100644 index 0000000000..987cbf93ef --- /dev/null +++ b/sermant-plugins/sermant-mq-grayscale/mq-grayscale-rocketmq-plugin/src/main/java/io/sermant/mq/grayscale/rocketmq/interceptor/RocketMqLitePullConsumerSubscribeInterceptor.java @@ -0,0 +1,40 @@ +/* + * Copyright (C) 2024-2024 Sermant Authors. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.sermant.mq.grayscale.rocketmq.interceptor; + +import io.sermant.core.plugin.agent.entity.ExecuteContext; + +import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer; +import org.apache.rocketmq.client.impl.consumer.DefaultLitePullConsumerImpl; + +/** + * LitePullConsumer subscribe/fetchMessageQueues method interceptor + * base scene recording namesrvAddr、topic、group info + * + * @author chengyouling + * @since 2024-05-27 + **/ +public class RocketMqLitePullConsumerSubscribeInterceptor extends RocketMqAbstractInterceptor { + @Override + public ExecuteContext doAfter(ExecuteContext context) throws Exception { + DefaultLitePullConsumerImpl litePullConsumerImpl = (DefaultLitePullConsumerImpl) context.getObject(); + DefaultLitePullConsumer pullConsumer = litePullConsumerImpl.getDefaultLitePullConsumer(); + buildGroupAndClientConfig(pullConsumer.getNamesrvAddr(), (String) context.getArguments()[0], + pullConsumer.getConsumerGroup()); + return context; + } +} diff --git a/sermant-plugins/sermant-mq-grayscale/mq-grayscale-rocketmq-plugin/src/main/java/io/sermant/mq/grayscale/rocketmq/interceptor/RocketMqProducerGrayMessageHookInterceptor.java b/sermant-plugins/sermant-mq-grayscale/mq-grayscale-rocketmq-plugin/src/main/java/io/sermant/mq/grayscale/rocketmq/interceptor/RocketMqProducerGrayMessageHookInterceptor.java new file mode 100644 index 0000000000..3da1a3df43 --- /dev/null +++ b/sermant-plugins/sermant-mq-grayscale/mq-grayscale-rocketmq-plugin/src/main/java/io/sermant/mq/grayscale/rocketmq/interceptor/RocketMqProducerGrayMessageHookInterceptor.java @@ -0,0 +1,46 @@ +/* + * Copyright (C) 2024-2024 Sermant Authors. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.sermant.mq.grayscale.rocketmq.interceptor; + +import io.sermant.core.plugin.agent.entity.ExecuteContext; +import io.sermant.core.utils.ReflectUtils; +import io.sermant.mq.grayscale.rocketmq.service.RocketMqGraySendMessageHook; + +import org.apache.rocketmq.client.hook.SendMessageHook; +import org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl; + +import java.lang.reflect.Method; +import java.util.Optional; + +/** + * SendMessageHook builder interceptor + * + * @author chengyouling + * @since 2024-05-27 + **/ +public class RocketMqProducerGrayMessageHookInterceptor extends RocketMqAbstractInterceptor { + @Override + public ExecuteContext doAfter(ExecuteContext context) throws Exception { + DefaultMQProducerImpl producer = (DefaultMQProducerImpl) context.getObject(); + Optional method = ReflectUtils.findMethod(producer.getClass(), "registerSendMessageHook", + new Class[]{SendMessageHook.class}); + if (method.isPresent()) { + method.get().invoke(producer, new RocketMqGraySendMessageHook()); + } + return context; + } +} diff --git a/sermant-plugins/sermant-mq-grayscale/mq-grayscale-rocketmq-plugin/src/main/java/io/sermant/mq/grayscale/rocketmq/interceptor/RocketMqPullConsumerFetchInterceptor.java b/sermant-plugins/sermant-mq-grayscale/mq-grayscale-rocketmq-plugin/src/main/java/io/sermant/mq/grayscale/rocketmq/interceptor/RocketMqPullConsumerFetchInterceptor.java new file mode 100644 index 0000000000..a8d0943480 --- /dev/null +++ b/sermant-plugins/sermant-mq-grayscale/mq-grayscale-rocketmq-plugin/src/main/java/io/sermant/mq/grayscale/rocketmq/interceptor/RocketMqPullConsumerFetchInterceptor.java @@ -0,0 +1,40 @@ +/* + * Copyright (C) 2024-2024 Sermant Authors. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.sermant.mq.grayscale.rocketmq.interceptor; + +import io.sermant.core.plugin.agent.entity.ExecuteContext; + +import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer; +import org.apache.rocketmq.client.impl.consumer.DefaultMQPullConsumerImpl; + +/** + * PullConsumer fetchSubscribeMessageQueues/fetchMessageQueuesInBalance method interceptor + * base scene recording namesrvAddr、topic、group info + * + * @author chengyouling + * @since 2024-05-27 + **/ +public class RocketMqPullConsumerFetchInterceptor extends RocketMqAbstractInterceptor { + @Override + public ExecuteContext doAfter(ExecuteContext context) throws Exception { + DefaultMQPullConsumer pullConsumer = + ((DefaultMQPullConsumerImpl) context.getObject()).getDefaultMQPullConsumer(); + buildGroupAndClientConfig(pullConsumer.getNamesrvAddr(), (String) context.getArguments()[0], + pullConsumer.getConsumerGroup()); + return context; + } +} diff --git a/sermant-plugins/sermant-mq-grayscale/mq-grayscale-rocketmq-plugin/src/main/java/io/sermant/mq/grayscale/rocketmq/interceptor/RocketMqPullConsumerSubscriptionUpdateInterceptor.java b/sermant-plugins/sermant-mq-grayscale/mq-grayscale-rocketmq-plugin/src/main/java/io/sermant/mq/grayscale/rocketmq/interceptor/RocketMqPullConsumerSubscriptionUpdateInterceptor.java new file mode 100644 index 0000000000..423c71d97b --- /dev/null +++ b/sermant-plugins/sermant-mq-grayscale/mq-grayscale-rocketmq-plugin/src/main/java/io/sermant/mq/grayscale/rocketmq/interceptor/RocketMqPullConsumerSubscriptionUpdateInterceptor.java @@ -0,0 +1,76 @@ +/* + * Copyright (C) 2024-2024 Sermant Authors. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.sermant.mq.grayscale.rocketmq.interceptor; + +import io.sermant.core.common.LoggerFactory; +import io.sermant.core.plugin.agent.entity.ExecuteContext; +import io.sermant.core.utils.ReflectUtils; +import io.sermant.core.utils.StringUtils; +import io.sermant.mq.grayscale.rocketmq.service.RocketMqConsumerGroupAutoCheck; +import io.sermant.mq.grayscale.rocketmq.utils.RocketMqGrayscaleConfigUtils; +import io.sermant.mq.grayscale.rocketmq.utils.RocketMqSubscriptionDataUtils; + +import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer; +import org.apache.rocketmq.client.impl.consumer.DefaultMQPullConsumerImpl; +import org.apache.rocketmq.client.impl.factory.MQClientInstance; +import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData; + +import java.util.Optional; +import java.util.logging.Level; +import java.util.logging.Logger; + +/** + * update pull consumer subscription SQL92 query statement interceptor + * + * @author chengyouling + * @since 2024-07-27 + **/ +public class RocketMqPullConsumerSubscriptionUpdateInterceptor extends RocketMqAbstractInterceptor { + private static final Logger LOGGER = LoggerFactory.getLogger(); + + @Override + public ExecuteContext doAfter(ExecuteContext context) throws Exception { + SubscriptionData subscriptionData = (SubscriptionData) context.getResult(); + if (RocketMqSubscriptionDataUtils.isExpressionTypeInaccurate(subscriptionData.getExpressionType())) { + return context; + } + Optional fieldValue = ReflectUtils.getFieldValue(context.getObject(), "mQClientFactory"); + if (!fieldValue.isPresent()) { + LOGGER.log(Level.SEVERE, "field mQClientFactory is not exist!"); + return context; + } + MQClientInstance instance = (MQClientInstance) fieldValue.get(); + buildSql92SubscriptionData(context, subscriptionData, instance); + return context; + } + + private void buildSql92SubscriptionData(ExecuteContext context, SubscriptionData subscriptionData, + MQClientInstance instance) { + DefaultMQPullConsumer pullConsumer + = ((DefaultMQPullConsumerImpl) context.getObject()).getDefaultMQPullConsumer(); + String consumerGroup = pullConsumer.getConsumerGroup(); + String namesrvAddr = instance.getClientConfig().getNamesrvAddr(); + if (StringUtils.isEmpty(RocketMqGrayscaleConfigUtils.getGrayGroupTag())) { + RocketMqConsumerGroupAutoCheck.setMqClientInstance(subscriptionData.getTopic(), consumerGroup, instance); + } + RocketMqConsumerGroupAutoCheck.SyncCheckGrayConsumerGroup(); + RocketMqConsumerGroupAutoCheck.startSchedulerCheckGroupTask(); + String subscribeScope = RocketMqSubscriptionDataUtils.buildSubscribeScope(subscriptionData.getTopic(), + consumerGroup, namesrvAddr); + RocketMqSubscriptionDataUtils.resetsSql92SubscriptionData(subscriptionData, subscribeScope); + } +} diff --git a/sermant-plugins/sermant-mq-grayscale/mq-grayscale-rocketmq-plugin/src/main/java/io/sermant/mq/grayscale/rocketmq/interceptor/RocketMqPushConsumerSubscribeFetchInterceptor.java b/sermant-plugins/sermant-mq-grayscale/mq-grayscale-rocketmq-plugin/src/main/java/io/sermant/mq/grayscale/rocketmq/interceptor/RocketMqPushConsumerSubscribeFetchInterceptor.java new file mode 100644 index 0000000000..9800338e65 --- /dev/null +++ b/sermant-plugins/sermant-mq-grayscale/mq-grayscale-rocketmq-plugin/src/main/java/io/sermant/mq/grayscale/rocketmq/interceptor/RocketMqPushConsumerSubscribeFetchInterceptor.java @@ -0,0 +1,40 @@ +/* + * Copyright (C) 2024-2024 Sermant Authors. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.sermant.mq.grayscale.rocketmq.interceptor; + +import io.sermant.core.plugin.agent.entity.ExecuteContext; + +import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; +import org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl; + +/** + * PushConsumer fetchSubscribeMessageQueues/subscribe method interceptor + * base scene recording namesrvAddr、topic、group info + * + * @author chengyouling + * @since 2024-05-27 + **/ +public class RocketMqPushConsumerSubscribeFetchInterceptor extends RocketMqAbstractInterceptor { + @Override + public ExecuteContext doAfter(ExecuteContext context) throws Exception { + DefaultMQPushConsumerImpl pushConsumerImpl = (DefaultMQPushConsumerImpl) context.getObject(); + DefaultMQPushConsumer pushConsumer = pushConsumerImpl.getDefaultMQPushConsumer(); + buildGroupAndClientConfig(pushConsumer.getNamesrvAddr(), (String) context.getArguments()[0], + pushConsumer.getConsumerGroup()); + return context; + } +} diff --git a/sermant-plugins/sermant-mq-grayscale/mq-grayscale-rocketmq-plugin/src/main/java/io/sermant/mq/grayscale/rocketmq/interceptor/RocketMqSchedulerRebuildSubscriptionInterceptor.java b/sermant-plugins/sermant-mq-grayscale/mq-grayscale-rocketmq-plugin/src/main/java/io/sermant/mq/grayscale/rocketmq/interceptor/RocketMqSchedulerRebuildSubscriptionInterceptor.java new file mode 100644 index 0000000000..c10142a8be --- /dev/null +++ b/sermant-plugins/sermant-mq-grayscale/mq-grayscale-rocketmq-plugin/src/main/java/io/sermant/mq/grayscale/rocketmq/interceptor/RocketMqSchedulerRebuildSubscriptionInterceptor.java @@ -0,0 +1,83 @@ +/* + * Copyright (C) 2024-2024 Sermant Authors. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.sermant.mq.grayscale.rocketmq.interceptor; + +import io.sermant.core.plugin.agent.entity.ExecuteContext; +import io.sermant.core.utils.StringUtils; +import io.sermant.mq.grayscale.rocketmq.service.RocketMqConsumerGroupAutoCheck; +import io.sermant.mq.grayscale.rocketmq.utils.RocketMqGrayscaleConfigUtils; +import io.sermant.mq.grayscale.rocketmq.utils.RocketMqSubscriptionDataUtils; + +import org.apache.rocketmq.client.impl.consumer.RebalanceImpl; +import org.apache.rocketmq.client.impl.factory.MQClientInstance; +import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData; + +import java.util.concurrent.ConcurrentMap; + +/** + * TAG/SQL92 query message statement interceptor + * + * @author chengyouling + * @since 2024-05-27 + **/ +public class RocketMqSchedulerRebuildSubscriptionInterceptor extends RocketMqAbstractInterceptor { + private final Object lock = new Object(); + + @Override + public ExecuteContext doAfter(ExecuteContext context) throws Exception { + ConcurrentMap map = (ConcurrentMap) context.getResult(); + RebalanceImpl balance = (RebalanceImpl) context.getObject(); + if (balance.getConsumerGroup() == null) { + return context; + } + for (SubscriptionData subscriptionData : map.values()) { + if (RocketMqSubscriptionDataUtils.isExpressionTypeInaccurate(subscriptionData.getExpressionType())) { + continue; + } + buildSql92SubscriptionData(subscriptionData, balance); + } + return context; + } + + private void buildSql92SubscriptionData(SubscriptionData subscriptionData, RebalanceImpl balance) { + synchronized (lock) { + String topic = subscriptionData.getTopic(); + if (!RocketMqSubscriptionDataUtils.getGrayTagChangeFlag(topic, balance)) { + return; + } + String consumerGroup = balance.getConsumerGroup(); + MQClientInstance instance = balance.getmQClientFactory(); + if (StringUtils.isEmpty(RocketMqGrayscaleConfigUtils.getGrayGroupTag())) { + RocketMqConsumerGroupAutoCheck.setMqClientInstance(topic, consumerGroup, instance); + } + RocketMqConsumerGroupAutoCheck.SyncCheckGrayConsumerGroup(); + RocketMqConsumerGroupAutoCheck.startSchedulerCheckGroupTask(); + String namesrvAddr = balance.getmQClientFactory().getClientConfig().getNamesrvAddr(); + resetsSql92SubscriptionData(topic, consumerGroup, subscriptionData, namesrvAddr); + + // update change flag when finished build substr + RocketMqSubscriptionDataUtils.resetTagChangeMap(namesrvAddr, topic, consumerGroup, false); + } + } + + private void resetsSql92SubscriptionData(String topic, String consumerGroup, SubscriptionData subscriptionData, + String namesrvAddr) { + String subscribeScope = RocketMqSubscriptionDataUtils.buildSubscribeScope(topic, consumerGroup, + namesrvAddr); + RocketMqSubscriptionDataUtils.resetsSql92SubscriptionData(subscriptionData, subscribeScope); + } +} diff --git a/sermant-plugins/sermant-mq-grayscale/mq-grayscale-rocketmq-plugin/src/main/java/io/sermant/mq/grayscale/rocketmq/service/RocketMqConsumerGroupAutoCheck.java b/sermant-plugins/sermant-mq-grayscale/mq-grayscale-rocketmq-plugin/src/main/java/io/sermant/mq/grayscale/rocketmq/service/RocketMqConsumerGroupAutoCheck.java new file mode 100644 index 0000000000..136e522a5c --- /dev/null +++ b/sermant-plugins/sermant-mq-grayscale/mq-grayscale-rocketmq-plugin/src/main/java/io/sermant/mq/grayscale/rocketmq/service/RocketMqConsumerGroupAutoCheck.java @@ -0,0 +1,260 @@ +/* + * Copyright (C) 2024-2024 Sermant Authors. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.sermant.mq.grayscale.rocketmq.service; + +import io.sermant.core.common.LoggerFactory; +import io.sermant.mq.grayscale.config.ConsumeModeEnum; +import io.sermant.mq.grayscale.config.GrayTagItem; +import io.sermant.mq.grayscale.config.MqGrayConfigCache; +import io.sermant.mq.grayscale.rocketmq.config.RocketMqConsumerClientConfig; +import io.sermant.mq.grayscale.rocketmq.utils.RocketMqGrayscaleConfigUtils; +import io.sermant.mq.grayscale.rocketmq.utils.RocketMqSubscriptionDataUtils; + +import org.apache.commons.lang3.StringUtils; +import org.apache.rocketmq.client.exception.MQBrokerException; +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.client.impl.MQClientAPIImpl; +import org.apache.rocketmq.client.impl.factory.MQClientInstance; +import org.apache.rocketmq.common.protocol.body.GroupList; +import org.apache.rocketmq.common.protocol.route.BrokerData; +import org.apache.rocketmq.common.protocol.route.TopicRouteData; +import org.apache.rocketmq.remoting.exception.RemotingConnectException; +import org.apache.rocketmq.remoting.exception.RemotingSendRequestException; +import org.apache.rocketmq.remoting.exception.RemotingTimeoutException; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.logging.Level; +import java.util.logging.Logger; + +/** + * consumer group auto check service + * + * @author chengyouling + * @since 2024-05-27 + **/ +public class RocketMqConsumerGroupAutoCheck { + private static final Logger LOGGER = LoggerFactory.getLogger(); + + private static final ScheduledExecutorService EXECUTOR_SERVICE = Executors.newSingleThreadScheduledExecutor(); + + /** + * gray consumer tags at last time + * key: namesrvAddr@topic@consumerGroup + * value: consumer gray tags + */ + private static final Map> LAST_TOPIC_GROUP_GRAY_TAG = new HashMap<>(); + + /** + * client configs for query consumer group + * key: namesrvAddr@topic@consumerGroup + * value: client config + */ + private static final Map CONSUMER_CLIENT_CONFIG_MAP = new HashMap<>(); + + private static final AtomicBoolean START_AUTO_CHECK = new AtomicBoolean(false); + + private static final long INITIAL_DELAY = 10L; + + private static final long ROCKET_MQ_READ_TIMEOUT = 5000L; + + private RocketMqConsumerGroupAutoCheck() { + } + + /** + * set current client MQClientInstance info + * + * @param topic topic + * @param consumerGroup consumerGroup + * @param mqClientInstance mqClientInstance + */ + public static void setMqClientInstance(String topic, String consumerGroup, MQClientInstance mqClientInstance) { + String subscribeScope = RocketMqSubscriptionDataUtils.buildSubscribeScope(topic, consumerGroup, + mqClientInstance.getClientConfig().getNamesrvAddr()); + RocketMqConsumerClientConfig currentConfig = CONSUMER_CLIENT_CONFIG_MAP.get(subscribeScope); + if (currentConfig != null && currentConfig.getMqClientInstance() == null) { + currentConfig.setMqClientInstance(mqClientInstance); + } + } + + /** + * sync check gray consumer group is running and update gray tags + */ + public static void SyncCheckGrayConsumerGroup() { + if (RocketMqGrayscaleConfigUtils.getConsumeType() == ConsumeModeEnum.AUTO && !START_AUTO_CHECK.get()) { + // sync to obtain current gray consumer group at AUTO mode before start scheduler check group + findGrayConsumerGroupAndUpdateGrayTags(); + } + } + + /** + * start scheduler check gray consumer is changed + */ + public static void startSchedulerCheckGroupTask() { + if (RocketMqGrayscaleConfigUtils.getConsumeType() == ConsumeModeEnum.AUTO) { + if (START_AUTO_CHECK.compareAndSet(false, true)) { + EXECUTOR_SERVICE.scheduleWithFixedDelay( + RocketMqConsumerGroupAutoCheck::findGrayConsumerGroupAndUpdateGrayTags, INITIAL_DELAY, + RocketMqGrayscaleConfigUtils.getAutoCheckDelayTime(), TimeUnit.SECONDS); + } + } + } + + /** + * find gray consumer group and update gray tags + */ + public static void findGrayConsumerGroupAndUpdateGrayTags() { + if (CONSUMER_CLIENT_CONFIG_MAP.isEmpty()) { + return; + } + if (!StringUtils.isEmpty(RocketMqGrayscaleConfigUtils.getGrayGroupTag())) { + return; + } + if (MqGrayConfigCache.getCacheConfig() == null + || MqGrayConfigCache.getCacheConfig().getGrayscale().isEmpty()) { + return; + } + for (RocketMqConsumerClientConfig clientConfig : CONSUMER_CLIENT_CONFIG_MAP.values()) { + if (clientConfig.getMqClientInstance() == null) { + continue; + } + Set grayTags = findGrayConsumerGroupAndGetTags(clientConfig); + LOGGER.log(Level.INFO,"[auto-check] current find gray tags: {0}.", grayTags); + resetAutoCheckGrayTagItems(grayTags, clientConfig); + } + } + + /** + * querying all consumer groups of Topic and Collecting grayGroupTag + * + * @param clientConfig clientConfig + * @return grayTags + */ + private static Set findGrayConsumerGroupAndGetTags(RocketMqConsumerClientConfig clientConfig) { + try { + MQClientAPIImpl mqClientApi = clientConfig.getMqClientInstance().getMQClientAPIImpl(); + String brokerAddress = getBrokerAddress(clientConfig.getTopic(), mqClientApi); + GroupList groupList = mqClientApi.queryTopicConsumeByWho(brokerAddress, clientConfig.getTopic(), + ROCKET_MQ_READ_TIMEOUT); + return getGrayTagsByConsumerGroup(groupList, brokerAddress, mqClientApi, + clientConfig.getConsumerGroup()); + } catch (MQClientException | InterruptedException | RemotingTimeoutException | RemotingSendRequestException + | RemotingConnectException | MQBrokerException e) { + LOGGER.log(Level.FINE, String.format(Locale.ENGLISH, "[auto-check] error, message: %s", + e.getMessage()), e); + } + return new HashSet<>(); + } + + private static Set getGrayTagsByConsumerGroup(GroupList groupList, String brokerAddress, + MQClientAPIImpl mqClientApi, String consumerGroup) { + Set grayTags = new HashSet<>(); + for (String group : groupList.getGroupList()) { + try { + List consumerIds = mqClientApi.getConsumerIdListByGroup(brokerAddress, group, + ROCKET_MQ_READ_TIMEOUT); + if (consumerIds.isEmpty()) { + continue; + } + String grayTag = StringUtils.substringAfterLast(group, consumerGroup + "_"); + if (!StringUtils.isEmpty(grayTag)) { + grayTags.add(grayTag); + } + } catch (RemotingConnectException | RemotingSendRequestException | RemotingTimeoutException + | MQBrokerException | InterruptedException e) { + LOGGER.warning(String.format(Locale.ENGLISH, "[auto-check] can not find ids in group: [%s].", + group)); + } + } + return grayTags; + } + + private static String getBrokerAddress(String topic, MQClientAPIImpl mqClientApi) + throws RemotingSendRequestException, RemotingConnectException, RemotingTimeoutException, + InterruptedException, MQClientException { + TopicRouteData topicRouteData = mqClientApi.getTopicRouteInfoFromNameServer(topic, ROCKET_MQ_READ_TIMEOUT, + false); + List brokerList = new ArrayList<>(); + for (BrokerData brokerData : topicRouteData.getBrokerDatas()) { + brokerList.addAll(brokerData.getBrokerAddrs().values()); + } + + // cluster mode has multiple addresses, just select one + return brokerList.get(0); + } + + /** + * compare current query grayGroupTag with collected last time, reset autoCheckGrayTagItems + * + * @param grayTags grayTags + * @param clientConfig MqConsumerClientConfig + */ + private static void resetAutoCheckGrayTagItems(Set grayTags, RocketMqConsumerClientConfig clientConfig) { + String subscribeScope = RocketMqSubscriptionDataUtils.buildSubscribeScope(clientConfig.getTopic(), + clientConfig.getConsumerGroup(), clientConfig.getAddress()); + if (grayTags.isEmpty()) { + if (LAST_TOPIC_GROUP_GRAY_TAG.containsKey(subscribeScope)) { + RocketMqSubscriptionDataUtils.resetAutoCheckGrayTagItems(new ArrayList<>(), clientConfig); + LAST_TOPIC_GROUP_GRAY_TAG.remove(subscribeScope); + } + return; + } + if (isGrayTagsChanged(grayTags, subscribeScope)) { + List grayTagItems = new ArrayList<>(); + for (String grayTag : grayTags) { + Optional item = MqGrayConfigCache.getCacheConfig().getGrayTagByGroupTag(grayTag); + item.ifPresent(grayTagItems::add); + } + LAST_TOPIC_GROUP_GRAY_TAG.put(subscribeScope, grayTags); + RocketMqSubscriptionDataUtils.resetAutoCheckGrayTagItems(grayTagItems, clientConfig); + } + } + + private static boolean isGrayTagsChanged(Set grayTags, String subscribeScope) { + HashSet currentGroups = new HashSet<>(grayTags); + Set lastTags = LAST_TOPIC_GROUP_GRAY_TAG.get(subscribeScope); + if (LAST_TOPIC_GROUP_GRAY_TAG.containsKey(subscribeScope)) { + currentGroups.removeAll(lastTags); + } + return !currentGroups.isEmpty() || grayTags.size() != lastTags.size(); + } + + /** + * set consumer client config + * + * @param address address + * @param topic topic + * @param consumerGroup consumerGroup + */ + public static void setConsumerClientConfig(String address, String topic, String consumerGroup) { + RocketMqConsumerClientConfig config = new RocketMqConsumerClientConfig(address, topic, consumerGroup); + String subscribeScope = RocketMqSubscriptionDataUtils.buildSubscribeScope(topic, consumerGroup, address); + if (!CONSUMER_CLIENT_CONFIG_MAP.containsKey(subscribeScope)) { + CONSUMER_CLIENT_CONFIG_MAP.put(subscribeScope, config); + } + } +} diff --git a/sermant-plugins/sermant-mq-grayscale/mq-grayscale-rocketmq-plugin/src/main/java/io/sermant/mq/grayscale/rocketmq/service/RocketMqGraySendMessageHook.java b/sermant-plugins/sermant-mq-grayscale/mq-grayscale-rocketmq-plugin/src/main/java/io/sermant/mq/grayscale/rocketmq/service/RocketMqGraySendMessageHook.java new file mode 100644 index 0000000000..0122cad2cc --- /dev/null +++ b/sermant-plugins/sermant-mq-grayscale/mq-grayscale-rocketmq-plugin/src/main/java/io/sermant/mq/grayscale/rocketmq/service/RocketMqGraySendMessageHook.java @@ -0,0 +1,48 @@ +/* + * Copyright (C) 2024-2024 Sermant Authors. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.sermant.mq.grayscale.rocketmq.service; + +import io.sermant.mq.grayscale.rocketmq.utils.RocketMqGrayscaleConfigUtils; + +import org.apache.rocketmq.client.hook.SendMessageContext; +import org.apache.rocketmq.client.hook.SendMessageHook; +import org.apache.rocketmq.common.message.Message; + +/** + * gray message send hook service + * + * @author chengyouling + * @since 2024-05-27 + **/ +public class RocketMqGraySendMessageHook implements SendMessageHook { + @Override + public String hookName() { + return "MqGraySendMessageHook"; + } + + @Override + public void sendMessageBefore(SendMessageContext context) { + Message message = context.getMessage(); + + // set traffic tags in message by matching serviceMeta + RocketMqGrayscaleConfigUtils.injectTrafficTagByServiceMeta(message); + } + + @Override + public void sendMessageAfter(SendMessageContext context) { + } +} diff --git a/sermant-plugins/sermant-mq-grayscale/mq-grayscale-rocketmq-plugin/src/main/java/io/sermant/mq/grayscale/rocketmq/utils/RocketMqGrayscaleConfigUtils.java b/sermant-plugins/sermant-mq-grayscale/mq-grayscale-rocketmq-plugin/src/main/java/io/sermant/mq/grayscale/rocketmq/utils/RocketMqGrayscaleConfigUtils.java new file mode 100644 index 0000000000..7e5b4e1e58 --- /dev/null +++ b/sermant-plugins/sermant-mq-grayscale/mq-grayscale-rocketmq-plugin/src/main/java/io/sermant/mq/grayscale/rocketmq/utils/RocketMqGrayscaleConfigUtils.java @@ -0,0 +1,146 @@ +/* + * Copyright (C) 2024-2024 Sermant Authors. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.sermant.mq.grayscale.rocketmq.utils; + +import io.sermant.core.config.ConfigManager; +import io.sermant.core.plugin.config.ServiceMeta; +import io.sermant.core.utils.StringUtils; +import io.sermant.mq.grayscale.config.ConsumeModeEnum; +import io.sermant.mq.grayscale.config.GrayTagItem; +import io.sermant.mq.grayscale.config.MqGrayConfigCache; +import io.sermant.mq.grayscale.config.MqGrayscaleConfig; + +import org.apache.rocketmq.common.message.Message; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.Optional; +import java.util.regex.Pattern; + +/** + * grayscale config util + * + * @author chengyouling + * @since 2024-06-03 + */ +public class RocketMqGrayscaleConfigUtils { + /** + * serviceMeta info + */ + public static final Map MICRO_SERVICE_PROPERTIES = new HashMap<>(); + + /** + * consumerGroup name rule: ^[%|a-zA-Z0-9_-]+$ + */ + private static final Pattern PATTERN = Pattern.compile("[^%|a-zA-Z0-9_-]"); + + static { + ServiceMeta serviceMeta = ConfigManager.getConfig(ServiceMeta.class); + MICRO_SERVICE_PROPERTIES.put("version", serviceMeta.getVersion()); + if (serviceMeta.getParameters() != null) { + MICRO_SERVICE_PROPERTIES.putAll(serviceMeta.getParameters()); + } + } + + private RocketMqGrayscaleConfigUtils() { + } + + /** + * compare mqGrayscaleConfig with serviceMeta, return match grayGroupTag + * + * @return grayGroupTag + */ + public static String getGrayGroupTag() { + if (!MqGrayConfigCache.getCacheConfig().isEnabled()) { + return ""; + } + Optional itemOptional + = MqGrayConfigCache.getCacheConfig().getMatchedGrayTagByServiceMeta(MICRO_SERVICE_PROPERTIES); + return itemOptional.map(grayTagItem -> standardFormatGroupTag(grayTagItem.getConsumerGroupTag())).orElse(""); + } + + /** + * get current consumerType + * + * @return consumeType + */ + public static ConsumeModeEnum getConsumeType() { + return MqGrayConfigCache.getCacheConfig().getBase().getConsumeMode(); + } + + /** + * get interval for scheduler find gray consumer + * + * @return delayTime + */ + public static long getAutoCheckDelayTime() { + return MqGrayConfigCache.getCacheConfig().getBase().getAutoCheckDelayTime(); + } + + /** + * format grayGroupTag + * + * @param grayGroupTag grayGroupTag + * @return standard grayGroupTag + */ + public static String standardFormatGroupTag(String grayGroupTag) { + return PATTERN.matcher(grayGroupTag.toLowerCase(Locale.ROOT)).replaceAll("-"); + } + + /** + * compare serviceMeta with mqGrayscaleConfig, set message property + * + * @param message message + */ + public static void injectTrafficTagByServiceMeta(Message message) { + if (!MqGrayConfigCache.getCacheConfig().isEnabled()) { + return; + } + Map grayTags + = MqGrayConfigCache.getCacheConfig().getGrayTagsByServiceMeta(MICRO_SERVICE_PROPERTIES); + if (grayTags.isEmpty()) { + return; + } + for (Map.Entry entry : grayTags.entrySet()) { + if (StringUtils.isEmpty(message.getProperty(entry.getKey()))) { + message.putUserProperty(entry.getKey(), entry.getValue()); + } + } + } + + /** + * get GrayTagItems by set excludeGroupTags + * + * @return GrayTagItems + */ + public static List getGrayTagItemByExcludeGroupTags() { + MqGrayscaleConfig mqGrayscaleConfig = MqGrayConfigCache.getCacheConfig(); + List result = new ArrayList<>(); + if (mqGrayscaleConfig.getBase() == null || mqGrayscaleConfig.getBase().getExcludeGroupTags().isEmpty()) { + return result; + } + for (String excludeGroupTag : mqGrayscaleConfig.getBase().getExcludeGroupTags()) { + if (mqGrayscaleConfig.getGrayTagByGroupTag(excludeGroupTag).isPresent()) { + result.add(mqGrayscaleConfig.getGrayTagByGroupTag(excludeGroupTag).get()); + } + } + return result; + } +} diff --git a/sermant-plugins/sermant-mq-grayscale/sermant-mq-grayscale-plugin/src/main/java/io/sermant/mq/grayscale/utils/SubscriptionDataUtils.java b/sermant-plugins/sermant-mq-grayscale/mq-grayscale-rocketmq-plugin/src/main/java/io/sermant/mq/grayscale/rocketmq/utils/RocketMqSubscriptionDataUtils.java similarity index 72% rename from sermant-plugins/sermant-mq-grayscale/sermant-mq-grayscale-plugin/src/main/java/io/sermant/mq/grayscale/utils/SubscriptionDataUtils.java rename to sermant-plugins/sermant-mq-grayscale/mq-grayscale-rocketmq-plugin/src/main/java/io/sermant/mq/grayscale/rocketmq/utils/RocketMqSubscriptionDataUtils.java index 172f68c560..2511a9aad6 100644 --- a/sermant-plugins/sermant-mq-grayscale/sermant-mq-grayscale-plugin/src/main/java/io/sermant/mq/grayscale/utils/SubscriptionDataUtils.java +++ b/sermant-plugins/sermant-mq-grayscale/mq-grayscale-rocketmq-plugin/src/main/java/io/sermant/mq/grayscale/rocketmq/utils/RocketMqSubscriptionDataUtils.java @@ -14,12 +14,14 @@ * limitations under the License. */ -package io.sermant.mq.grayscale.utils; +package io.sermant.mq.grayscale.rocketmq.utils; import io.sermant.core.common.LoggerFactory; import io.sermant.mq.grayscale.config.ConsumeModeEnum; import io.sermant.mq.grayscale.config.GrayTagItem; -import io.sermant.mq.grayscale.config.MqConsumerClientConfig; +import io.sermant.mq.grayscale.config.MqGrayConfigCache; +import io.sermant.mq.grayscale.config.rocketmq.RocketMqConfigUtils; +import io.sermant.mq.grayscale.rocketmq.config.RocketMqConsumerClientConfig; import io.sermant.mq.grayscale.config.MqGrayscaleConfig; import org.apache.commons.lang3.StringUtils; @@ -44,7 +46,7 @@ * @author chengyouling * @since 2024-06-03 */ -public class SubscriptionDataUtils { +public class RocketMqSubscriptionDataUtils { /** * tag consume message type */ @@ -76,20 +78,6 @@ public class SubscriptionDataUtils { */ private static final Map> AUTO_CHECK_GRAY_TAGS = new ConcurrentHashMap<>(); - /** - * base instance subscript gray tag change flags - * key: namesrvAddr@topic@consumerGroup - * value: change flag - */ - private static final Map BASE_SUBSCRIPT_GRAY_TAG_CHANGE_MAP = new ConcurrentHashMap<>(); - - /** - * gray instance subscript gray tag change flags - * key: namesrvAddr@topic@consumerGroup - * value: change flag - */ - private static final Map GRAY_GROUP_TAG_CHANGE_MAP = new ConcurrentHashMap<>(); - private static final String RETYPE = "%RETRY%"; private static final String RIGHT_BRACKET = ")"; @@ -98,7 +86,7 @@ public class SubscriptionDataUtils { private static final String AND_SPLICE_STR = " and "; - private SubscriptionDataUtils() { + private RocketMqSubscriptionDataUtils() { } /** @@ -130,15 +118,15 @@ private static String getStrForSets(Set tags) { * add current gray tags to sql92 expression * * @param originSubData originSubData - * @param addrTopicGroupKey addrTopicGroupKey + * @param subscribeScope subscribeScope * @return sql expression */ - public static String addGrayTagsToSql92Expression(String originSubData, String addrTopicGroupKey) { + public static String addGrayTagsToSql92Expression(String originSubData, String subscribeScope) { String originSubDataBak = originSubData; if (!StringUtils.isBlank(originSubDataBak)) { originSubDataBak = rebuildWithoutGrayTagSubData(originSubDataBak); } - String sql92Expression = buildSql92Expression(addrTopicGroupKey); + String sql92Expression = buildSql92Expression(subscribeScope); if (StringUtils.isBlank(sql92Expression)) { return originSubDataBak; } @@ -146,12 +134,12 @@ public static String addGrayTagsToSql92Expression(String originSubData, String a ? sql92Expression : originSubDataBak + AND_SPLICE_STR + sql92Expression; } - private static String buildSql92Expression(String addrTopicGroupKey) { + private static String buildSql92Expression(String subscribeScope) { StringBuilder sb = new StringBuilder(); - if (StringUtils.isEmpty(MqGrayscaleConfigUtils.getGrayGroupTag())) { + if (StringUtils.isEmpty(RocketMqGrayscaleConfigUtils.getGrayGroupTag())) { // base model return without exclude group message - if (MqGrayscaleConfigUtils.getConsumeType() == ConsumeModeEnum.BASE) { - List items = MqGrayscaleConfigUtils.getGrayTagItemByExcludeGroupTags(); + if (RocketMqGrayscaleConfigUtils.getConsumeType() == ConsumeModeEnum.BASE) { + List items = RocketMqGrayscaleConfigUtils.getGrayTagItemByExcludeGroupTags(); if (!items.isEmpty()) { sb.append(buildBaseConsumerSql92Expression(items)); } @@ -159,24 +147,24 @@ private static String buildSql92Expression(String addrTopicGroupKey) { } // auto model return without exclude group and current consume message gray group message - sb.append(buildBaseConsumerSql92Expression(getAutoTypeGrayTagItems(addrTopicGroupKey))); + sb.append(buildBaseConsumerSql92Expression(getAutoTypeGrayTagItems(subscribeScope))); } else { - MqGrayscaleConfig mqGrayscaleConfig = MqGrayscaleConfigUtils.getGrayscaleConfigs(); + MqGrayscaleConfig mqGrayscaleConfig = MqGrayConfigCache.getCacheConfig(); Optional grayTagItem - = mqGrayscaleConfig.getGrayTagByGroupTag(MqGrayscaleConfigUtils.getGrayGroupTag()); + = mqGrayscaleConfig.getGrayTagByGroupTag(RocketMqGrayscaleConfigUtils.getGrayGroupTag()); if (grayTagItem.isPresent()) { sb.append(buildGrayConsumerSql92Expression(grayTagItem.get())); } else { LOGGER.warning(String.format(Locale.ENGLISH, "current gray group [%s] had not set grayscale, set it " - + "and restart service to valid.", MqGrayscaleConfigUtils.getGrayGroupTag())); + + "and restart service to valid.", RocketMqGrayscaleConfigUtils.getGrayGroupTag())); } } return sb.toString(); } - private static List getAutoTypeGrayTagItems(String addrTopicGroupKey) { - List excludeItems = MqGrayscaleConfigUtils.getGrayTagItemByExcludeGroupTags(); - List autoDiscoveryGrayTags = AUTO_CHECK_GRAY_TAGS.get(addrTopicGroupKey); + private static List getAutoTypeGrayTagItems(String subscribeScope) { + List excludeItems = RocketMqGrayscaleConfigUtils.getGrayTagItemByExcludeGroupTags(); + List autoDiscoveryGrayTags = AUTO_CHECK_GRAY_TAGS.get(subscribeScope); if (autoDiscoveryGrayTags != null && !autoDiscoveryGrayTags.isEmpty()) { excludeItems.addAll(autoDiscoveryGrayTags); } @@ -272,7 +260,7 @@ private static String rebuildWithoutGrayTagSubData(String originSubData) { } private static boolean containsGrayTags(String condition) { - for (String key : MqGrayscaleConfigUtils.getGrayTagsSet()) { + for (String key : RocketMqConfigUtils.getGrayTagsSet()) { if (condition.contains(key)) { return true; } @@ -284,16 +272,16 @@ private static boolean containsGrayTags(String condition) { * reset subString with gray tags * * @param subscriptionData subscriptionData - * @param addrTopicGroupKey addrTopicGroupKey + * @param subscribeScope subscribeScope */ - public static void resetsSql92SubscriptionData(SubscriptionData subscriptionData, String addrTopicGroupKey) { + public static void resetsSql92SubscriptionData(SubscriptionData subscriptionData, String subscribeScope) { String originSubData; if (EXPRESSION_TYPE_TAG.equals(subscriptionData.getExpressionType())) { originSubData = buildSql92ExpressionByTags(subscriptionData.getTagsSet()); } else { originSubData = subscriptionData.getSubString(); } - String newSubStr = addGrayTagsToSql92Expression(originSubData, addrTopicGroupKey); + String newSubStr = addGrayTagsToSql92Expression(originSubData, subscribeScope); if (StringUtils.isEmpty(newSubStr)) { newSubStr = SELECT_ALL_MESSAGE_SQL; } @@ -305,7 +293,7 @@ public static void resetsSql92SubscriptionData(SubscriptionData subscriptionData subscriptionData.setSubString(newSubStr); subscriptionData.setSubVersion(System.currentTimeMillis()); LOGGER.warning(String.format(Locale.ENGLISH, "update [key: %s] SQL92 subscriptionData, originSubStr: " - + "[%s], newSubStr: [%s]", addrTopicGroupKey, originSubData, newSubStr)); + + "[%s], newSubStr: [%s]", subscribeScope, originSubData, newSubStr)); } /** @@ -314,52 +302,27 @@ public static void resetsSql92SubscriptionData(SubscriptionData subscriptionData * @param grayTagItems grayTagItems * @param clientConfig clientConfig */ - public static void resetAutoCheckGrayTagItems(List grayTagItems, MqConsumerClientConfig clientConfig) { - String addrTopicGroupKey = buildAddrTopicGroupKey(clientConfig.getTopic(), clientConfig.getConsumerGroup(), + public static void resetAutoCheckGrayTagItems(List grayTagItems, + RocketMqConsumerClientConfig clientConfig) { + String subscribeScope = buildSubscribeScope(clientConfig.getTopic(), clientConfig.getConsumerGroup(), clientConfig.getAddress()); - AUTO_CHECK_GRAY_TAGS.remove(addrTopicGroupKey); - setAutoCheckTagChangeMap(clientConfig.getAddress(), clientConfig.getTopic(), clientConfig.getConsumerGroup(), - true); + AUTO_CHECK_GRAY_TAGS.remove(subscribeScope); + RocketMqConfigUtils.setBaseGroupTagChangeMap(buildSubscribeScope(clientConfig.getTopic(), + clientConfig.getConsumerGroup(), clientConfig.getAddress()), true); if (!grayTagItems.isEmpty()) { - AUTO_CHECK_GRAY_TAGS.put(addrTopicGroupKey, grayTagItems); + AUTO_CHECK_GRAY_TAGS.put(subscribeScope, grayTagItems); } } /** - * set base consumer address@topic@group correspondents change flag - * - * @param namesrvAddr namesrvAddr - * @param topic topic - * @param group group - * @param flag flag - */ - public static void setAutoCheckTagChangeMap(String namesrvAddr, String topic, String group, boolean flag) { - String addrTopicGroupKey = buildAddrTopicGroupKey(topic, group, namesrvAddr); - BASE_SUBSCRIPT_GRAY_TAG_CHANGE_MAP.put(addrTopicGroupKey, flag); - } - - /** - * set gray consumer address@topic@group correspondents change flag - * - * @param namesrvAddr namesrvAddr - * @param topic topic - * @param group group - * @param flag flag - */ - public static void setGrayGroupTagChangeMap(String namesrvAddr, String topic, String group, boolean flag) { - String addrTopicGroupKey = buildAddrTopicGroupKey(topic, group, namesrvAddr); - GRAY_GROUP_TAG_CHANGE_MAP.put(addrTopicGroupKey, flag); - } - - /** - * using namesrvAddr/topic/consumerGroup build key + * using namesrvAddr/topic/consumerGroup build subscribe scope * * @param topic topic * @param consumerGroup consumerGroup * @param namesrvAddr namesrvAddr - * @return namesrvAddr@topic@consumerGroup + * @return subscribeScope */ - public static String buildAddrTopicGroupKey(String topic, String consumerGroup, String namesrvAddr) { + public static String buildSubscribeScope(String topic, String consumerGroup, String namesrvAddr) { String topicTemp = topic.contains(RETYPE) ? StringUtils.substringAfterLast(topic, RETYPE) : topic; String consumerGroupTemp = consumerGroup.contains(RETYPE) ? StringUtils.substringAfterLast(consumerGroup, RETYPE) : consumerGroup; @@ -374,22 +337,12 @@ public static String buildAddrTopicGroupKey(String topic, String consumerGroup, * @return changeFlag */ public static boolean getGrayTagChangeFlag(String topic, RebalanceImpl rebalance) { - String addrTopicGroupKey = buildAddrTopicGroupKey(topic, rebalance.getConsumerGroup(), + String subscribeScope = buildSubscribeScope(topic, rebalance.getConsumerGroup(), rebalance.getmQClientFactory().getClientConfig().getNamesrvAddr()); - if (StringUtils.isEmpty(MqGrayscaleConfigUtils.getGrayGroupTag())) { - return BASE_SUBSCRIPT_GRAY_TAG_CHANGE_MAP.get(addrTopicGroupKey) != null - && BASE_SUBSCRIPT_GRAY_TAG_CHANGE_MAP.get(addrTopicGroupKey); + if (StringUtils.isEmpty(RocketMqGrayscaleConfigUtils.getGrayGroupTag())) { + return RocketMqConfigUtils.getBaseGroupTagChangeMap(subscribeScope); } - return GRAY_GROUP_TAG_CHANGE_MAP.get(addrTopicGroupKey) != null - && GRAY_GROUP_TAG_CHANGE_MAP.get(addrTopicGroupKey); - } - - /** - * update all consumer gray tag change flag - */ - public static void updateChangeFlag() { - BASE_SUBSCRIPT_GRAY_TAG_CHANGE_MAP.replaceAll((k, v) -> true); - GRAY_GROUP_TAG_CHANGE_MAP.replaceAll((k, v) -> true); + return RocketMqConfigUtils.getGrayGroupTagChangeMap(subscribeScope); } /** @@ -401,10 +354,25 @@ public static void updateChangeFlag() { * @param flag flag */ public static void resetTagChangeMap(String namesrvAddr, String topic, String consumerGroup, boolean flag) { - if (StringUtils.isEmpty(MqGrayscaleConfigUtils.getGrayGroupTag())) { - setAutoCheckTagChangeMap(namesrvAddr, topic, consumerGroup, flag); + String subscribeScope = buildSubscribeScope(topic, consumerGroup, namesrvAddr); + if (StringUtils.isEmpty(RocketMqGrayscaleConfigUtils.getGrayGroupTag())) { + RocketMqConfigUtils.setBaseGroupTagChangeMap(subscribeScope, flag); } else { - setGrayGroupTagChangeMap(namesrvAddr, topic, consumerGroup, flag); + RocketMqConfigUtils.setGrayGroupTagChangeMap(subscribeScope, flag); + } + } + + /** + * check expressionType is inaccurate + * + * @param expressionType expressionType + * @return is inaccurate + */ + public static boolean isExpressionTypeInaccurate(String expressionType) { + if (!EXPRESSION_TYPE_SQL92.equals(expressionType) && !EXPRESSION_TYPE_TAG.equals(expressionType)) { + LOGGER.warning(String.format(Locale.ENGLISH, "can not process expressionType: %s", expressionType)); + return true; } + return false; } } diff --git a/sermant-plugins/sermant-mq-grayscale/mq-grayscale-rocketmq-plugin/src/main/resources/META-INF/services/io.sermant.core.plugin.agent.declarer.PluginDeclarer b/sermant-plugins/sermant-mq-grayscale/mq-grayscale-rocketmq-plugin/src/main/resources/META-INF/services/io.sermant.core.plugin.agent.declarer.PluginDeclarer new file mode 100644 index 0000000000..23a49a7aff --- /dev/null +++ b/sermant-plugins/sermant-mq-grayscale/mq-grayscale-rocketmq-plugin/src/main/resources/META-INF/services/io.sermant.core.plugin.agent.declarer.PluginDeclarer @@ -0,0 +1,25 @@ +# +# Copyright (C) 2024-2024 Sermant Authors. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +io.sermant.mq.grayscale.rocketmq.declarer.RocketMqProducerGrayMessageHookDeclarer +io.sermant.mq.grayscale.rocketmq.declarer.RocketMqPullConsumerSubscriptionUpdateDeclarer +io.sermant.mq.grayscale.rocketmq.declarer.RocketMqSchedulerRebuildSubscriptionDeclarer +io.sermant.mq.grayscale.rocketmq.declarer.RocketMqLitePullConsumerSubscribeDeclarer +io.sermant.mq.grayscale.rocketmq.declarer.RocketMqPullConsumerFetchDeclarer +io.sermant.mq.grayscale.rocketmq.declarer.RocketMqPushConsumerSubscribeFetchDeclarer +io.sermant.mq.grayscale.rocketmq.declarer.RocketMqLitePullConsumerConstructorDeclarer +io.sermant.mq.grayscale.rocketmq.declarer.RocketMqPullConsumerConstructorDeclarer +io.sermant.mq.grayscale.rocketmq.declarer.RocketMqPushConsumerConstructorDeclarer \ No newline at end of file diff --git a/sermant-plugins/sermant-mq-grayscale/pom.xml b/sermant-plugins/sermant-mq-grayscale/pom.xml index 92b4865d42..503066a637 100644 --- a/sermant-plugins/sermant-mq-grayscale/pom.xml +++ b/sermant-plugins/sermant-mq-grayscale/pom.xml @@ -41,19 +41,25 @@ true - sermant-mq-grayscale-plugin + mq-grayscale-rocketmq-plugin + mq-config-common + mq-config-service test - sermant-mq-grayscale-plugin + mq-grayscale-rocketmq-plugin + mq-config-common + mq-config-service release - sermant-mq-grayscale-plugin + mq-grayscale-rocketmq-plugin + mq-config-common + mq-config-service diff --git a/sermant-plugins/sermant-mq-grayscale/sermant-mq-grayscale-plugin/src/main/java/io/sermant/mq/grayscale/utils/MqGrayscaleConfigUtils.java b/sermant-plugins/sermant-mq-grayscale/sermant-mq-grayscale-plugin/src/main/java/io/sermant/mq/grayscale/utils/MqGrayscaleConfigUtils.java deleted file mode 100644 index 7bfe97f6a8..0000000000 --- a/sermant-plugins/sermant-mq-grayscale/sermant-mq-grayscale-plugin/src/main/java/io/sermant/mq/grayscale/utils/MqGrayscaleConfigUtils.java +++ /dev/null @@ -1,236 +0,0 @@ -/* - * Copyright (C) 2024-2024 Sermant Authors. All rights reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.sermant.mq.grayscale.utils; - -import io.sermant.core.config.ConfigManager; -import io.sermant.core.plugin.config.PluginConfigManager; -import io.sermant.core.plugin.config.ServiceMeta; -import io.sermant.core.service.dynamicconfig.common.DynamicConfigEventType; -import io.sermant.mq.grayscale.config.ConsumeModeEnum; -import io.sermant.mq.grayscale.config.GrayTagItem; -import io.sermant.mq.grayscale.config.MqGrayscaleConfig; - -import org.apache.rocketmq.common.message.Message; - -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Locale; -import java.util.Map; -import java.util.Optional; -import java.util.Set; -import java.util.regex.Pattern; - -/** - * grayscale config util - * - * @author chengyouling - * @since 2024-06-03 - */ -public class MqGrayscaleConfigUtils { - /** - * serviceMeta info - */ - public static final Map MICRO_SERVICE_PROPERTIES = new HashMap<>(); - - private static MqGrayscaleConfig cacheConfig = PluginConfigManager.getPluginConfig(MqGrayscaleConfig.class); - - /** - * all traffic tags that's been set, using for sql92 expression reset - */ - private static final Set GRAY_TAGS_SET = new HashSet<>(); - - /** - * consumerGroup name rule: ^[%|a-zA-Z0-9_-]+$ - */ - private static final Pattern PATTERN = Pattern.compile("[^%|a-zA-Z0-9_-]"); - - static { - ServiceMeta serviceMeta = ConfigManager.getConfig(ServiceMeta.class); - MICRO_SERVICE_PROPERTIES.put("version", serviceMeta.getVersion()); - if (serviceMeta.getParameters() != null) { - MICRO_SERVICE_PROPERTIES.putAll(serviceMeta.getParameters()); - } - } - - private MqGrayscaleConfigUtils() { - } - - /** - * compare mqGrayscaleConfig with serviceMeta, return match grayGroupTag - * - * @return grayGroupTag - */ - public static String getGrayGroupTag() { - if (!cacheConfig.isEnabled()) { - return ""; - } - Optional itemOptional = cacheConfig.getMatchedGrayTagByServiceMeta(MICRO_SERVICE_PROPERTIES); - return itemOptional.map(grayTagItem -> standardFormatGroupTag(grayTagItem.getConsumerGroupTag())).orElse(""); - } - - /** - * get current consumerType - * - * @return consumeType - */ - public static ConsumeModeEnum getConsumeType() { - return cacheConfig.getBase().getConsumeMode(); - } - - /** - * get interval for scheduler find gray consumer - * - * @return delayTime - */ - public static long getAutoCheckDelayTime() { - return cacheConfig.getBase().getAutoCheckDelayTime(); - } - - /** - * format grayGroupTag - * - * @param grayGroupTag grayGroupTag - * @return standard grayGroupTag - */ - public static String standardFormatGroupTag(String grayGroupTag) { - return PATTERN.matcher(grayGroupTag.toLowerCase(Locale.ROOT)).replaceAll("-"); - } - - /** - * reset cache mqGrayscaleConfig - */ - public static void resetGrayscaleConfig() { - cacheConfig = new MqGrayscaleConfig(); - } - - /** - * set/update cache mqGrayscaleConfig - * - * @param config config - * @param eventType eventType - */ - public static void setGrayscaleConfig(MqGrayscaleConfig config, DynamicConfigEventType eventType) { - buildGrayTagsSet(config); - if (eventType == DynamicConfigEventType.CREATE) { - cacheConfig = config; - SubscriptionDataUtils.updateChangeFlag(); - return; - } - boolean isAllowRefresh = isAllowRefreshChangeFlag(cacheConfig, config); - if (isAllowRefresh) { - cacheConfig.updateGrayscaleConfig(config); - SubscriptionDataUtils.updateChangeFlag(); - } - } - - private static void buildGrayTagsSet(MqGrayscaleConfig config) { - for (GrayTagItem item : config.getGrayscale()) { - if (!item.getTrafficTag().isEmpty()) { - GRAY_TAGS_SET.addAll(item.getTrafficTag().keySet()); - } - } - } - - /** - * only traffic label changes allow refresh tag change map to rebuild SQL92 query statement, - * because if the serviceMeta changed, the gray consumer cannot be matched and becomes a base consumer - * so, if you need to change the env tag, restart all services. - * - * @param resource cache config - * @param target cache config - * @return boolean - */ - private static boolean isAllowRefreshChangeFlag(MqGrayscaleConfig resource, MqGrayscaleConfig target) { - if (resource.isEnabled() != target.isEnabled()) { - return true; - } - if (resource.isBaseExcludeGroupTagsChanged(target)) { - return true; - } - if (resource.isConsumerModeChanged(target)) { - return true; - } - return !resource.buildAllTrafficTagInfoToStr().equals(target.buildAllTrafficTagInfoToStr()); - } - - /** - * get plugin enabled - * - * @return plugin enabled - */ - public static boolean isPluginEnabled() { - return cacheConfig.isEnabled(); - } - - /** - * compare serviceMeta with mqGrayscaleConfig, set message property - * - * @param message message - */ - public static void setUserPropertyByServiceMeta(Message message) { - if (!cacheConfig.isEnabled()) { - return; - } - Map grayTags = cacheConfig.getGrayTagsByServiceMeta(MICRO_SERVICE_PROPERTIES); - if (grayTags.isEmpty()) { - return; - } - for (Map.Entry entry : grayTags.entrySet()) { - message.putUserProperty(entry.getKey(), entry.getValue()); - } - } - - /** - * get cache mqGrayscaleConfig - * - * @return MqGrayscaleConfig - */ - public static MqGrayscaleConfig getGrayscaleConfigs() { - return cacheConfig; - } - - /** - * get GrayTagItems by set excludeGroupTags - * - * @return GrayTagItems - */ - public static List getGrayTagItemByExcludeGroupTags() { - MqGrayscaleConfig mqGrayscaleConfig = getGrayscaleConfigs(); - if (mqGrayscaleConfig.getBase() == null || mqGrayscaleConfig.getBase().getExcludeGroupTags().isEmpty()) { - return Collections.emptyList(); - } - List result = new ArrayList<>(); - for (String excludeGroupTag : mqGrayscaleConfig.getBase().getExcludeGroupTags()) { - if (mqGrayscaleConfig.getGrayTagByGroupTag(excludeGroupTag).isPresent()) { - result.add(mqGrayscaleConfig.getGrayTagByGroupTag(excludeGroupTag).get()); - } - } - return result; - } - - /** - * get all config set gray tags - * - * @return all config set gray tags - */ - public static Set getGrayTagsSet() { - return GRAY_TAGS_SET; - } -}