Skip to content

Commit

Permalink
rocketmq禁止消费开发-controller、拦截点
Browse files Browse the repository at this point in the history
Signed-off-by: daizhenyu <1449308021@qq.com>
  • Loading branch information
daizhenyu committed Dec 9, 2023
1 parent 6520598 commit aad721d
Show file tree
Hide file tree
Showing 11 changed files with 966 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,24 +10,27 @@
<modelVersion>4.0.0</modelVersion>

<artifactId>consumer-controller</artifactId>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
</properties>

<dependencies>
<dependency>
<groupId>com.huaweicloud.sermant</groupId>
<artifactId>sermant-agentcore-god</artifactId>
<version>1.0.0</version>
<scope>compile</scope>
<artifactId>sermant-agentcore-core</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>1.1.0</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<scope>provided</scope>
</dependency>
</dependencies>

<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
</properties>

</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
/*
* Copyright (C) 2023-2023 Huawei Technologies Co., Ltd. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.huaweicloud.sermant.rocketmq.cache;

import com.huaweicloud.sermant.rocketmq.warpper.DefaultLitePullConsumerWrapper;
import com.huaweicloud.sermant.rocketmq.warpper.DefaultMqPushConsumerWrapper;

import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;

/**
* rocketmq消费者缓存
*
* @author daizhenyu
* @since 2023-12-04
**/
public class RocketMqConsumerCache {
private static Set<DefaultMqPushConsumerWrapper> pushConsumerCache =
new CopyOnWriteArraySet<>();

private static Set<DefaultLitePullConsumerWrapper> pullConsumerCache =
new CopyOnWriteArraySet<>();

private RocketMqConsumerCache() {
}

/**
* 更新PushConsumer缓存
*
* @param pushConsumerWrapper pushConsumer包装类实例
*/
public static void updatePushConsumerCache(DefaultMqPushConsumerWrapper pushConsumerWrapper) {
pushConsumerCache.add(pushConsumerWrapper);
}

/**
* 更新PullConsumer缓存
*
* @param pullConsumerWrapper pullConsumer包装类实例
*/
public static void updatePullConsumerCache(DefaultLitePullConsumerWrapper pullConsumerWrapper) {
pullConsumerCache.add(pullConsumerWrapper);
}

/**
* 获取订阅topics的pushConsumer列表
*
* @param topics 订阅主题Set
* @return pushConsumer列表
*/
public static Set<DefaultMqPushConsumerWrapper> getPushConsumersSubscribed2Topics(Set<String> topics) {
Set<DefaultMqPushConsumerWrapper> matchedConsumers = new HashSet<>();
for (DefaultMqPushConsumerWrapper wrapper : pushConsumerCache) {
Set<String> subscribedTopic = wrapper.getSubscribedTopic();
if (subscribedTopic.stream().anyMatch(topics::contains)) {
matchedConsumers.add(wrapper);
}
}
return matchedConsumers;
}

/**
* 获取未订阅topics的pushConsumer列表
*
* @param topics 订阅主题Set
* @return pushConsumer列表
*/
public static Set<DefaultMqPushConsumerWrapper> getPushConsumersUnsubscribed2Topics(Set<String> topics) {
Set<DefaultMqPushConsumerWrapper> unmatchedConsumers = new HashSet<>();
for (DefaultMqPushConsumerWrapper wrapper : pushConsumerCache) {
Set<String> subscribedTopic = wrapper.getSubscribedTopic();
if (!subscribedTopic.stream().anyMatch(topics::contains)) {
unmatchedConsumers.add(wrapper);
}
}
return unmatchedConsumers;
}

/**
* 获取订阅topics的pullConsumer列表
*
* @param topics 订阅主题Set
* @return pullConsumer列表
*/
public static Set<DefaultLitePullConsumerWrapper> getPullConsumersSubscribed2Topics(Set<String> topics) {
Set<DefaultLitePullConsumerWrapper> matchedConsumers = new HashSet<>();
for (DefaultLitePullConsumerWrapper wrapper : pullConsumerCache) {
Set<String> subscribedTopic = wrapper.getSubscribedTopic();
if (subscribedTopic.stream().anyMatch(topics::contains)) {
matchedConsumers.add(wrapper);
}
}
return matchedConsumers;
}

/**
* 获取未订阅topics的pullConsumer列表
*
* @param topics 订阅主题Set
* @return pullConsumer列表
*/
public static Set<DefaultLitePullConsumerWrapper> getPullConsumersUnsubscribed2Topics(Set<String> topics) {
Set<DefaultLitePullConsumerWrapper> unmatchedConsumers = new HashSet<>();
for (DefaultLitePullConsumerWrapper wrapper : pullConsumerCache) {
Set<String> subscribedTopic = wrapper.getSubscribedTopic();
if (!subscribedTopic.stream().anyMatch(topics::contains)) {
unmatchedConsumers.add(wrapper);
}
}
return unmatchedConsumers;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
* Copyright (C) 2023-2023 Huawei Technologies Co., Ltd. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.huaweicloud.sermant.rocketmq.constant;

/**
* rocketmq pull消费者订阅方式枚举类
*
* @author daizhenyu
* @since 2023-12-05
**/
public enum SubscriptionType {
/**
* 订阅方式为NONE
*/
NONE("NONE"),
/**
* 通过订阅topic进行消费
*/
SUBSCRIBE("SUBSCRIBE"),
/**
* 通过指定队列进行消费
*/
ASSIGN("ASSIGN");

private final String subscriptionName;

SubscriptionType(String subscriptionName) {
this.subscriptionName = subscriptionName;
}

public String getSubscriptionTypeName() {
return this.subscriptionName;
}

/**
* 用于根据订阅方式名称获取订阅枚举对象
*
* @param name 订阅方式名称
* @return SubscriptionType
*/
public static SubscriptionType getSubscriptionTypeByName(String name) {
for (SubscriptionType value : values()) {
if (value.getSubscriptionTypeName().equals(name)) {
return value;
}
}
return NONE;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
/*
* Copyright (C) 2023-2023 Huawei Technologies Co., Ltd. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.huaweicloud.sermant.rocketmq.controller;

import com.huaweicloud.sermant.rocketmq.cache.RocketMqConsumerCache;
import com.huaweicloud.sermant.rocketmq.warpper.DefaultLitePullConsumerWrapper;
import com.huaweicloud.sermant.rocketmq.warpper.DefaultMqPushConsumerWrapper;

import java.util.Set;

/**
* 消费者控制类
*
* @author daizhenyu
* @since 2023-12-04
**/
public class RocketMqConsumerController {
private RocketMqConsumerController() {
}

/**
* 禁止订阅了topics集合的消费者消费,不属于topics集合的消费者恢复消费
*
* @param topics
*/
public static void disableConsumer(Set<String> topics) {
suspendConsumer(topics);
resumeConsumer(topics);
}

/**
* 禁止订阅了topics集合的消费者消费
*
* @param topics 订阅主题Set
*/
private static void suspendConsumer(Set<String> topics) {
suspendPushConsumer(topics);
suspendPullConsumer(topics);
}

/**
* 不属于topics集合的消费者恢复消费
*
* @param topics 订阅主题Set
*/
private static void resumeConsumer(Set<String> topics) {
resumePushConsumer(topics);
resumePullConsumer(topics);
}

/**
* 更新PushConsumer缓存
*
* @param pushConsumerWrapper pushConsumer包装类实例
*/
public static void updatePushConsumerCache(DefaultMqPushConsumerWrapper pushConsumerWrapper) {
RocketMqConsumerCache.updatePushConsumerCache(pushConsumerWrapper);
}

/**
* 更新PullConsumer缓存
*
* @param pullConsumerWrapper pullConsumer包装类实例
*/
public static void updatePullConsumerCache(DefaultLitePullConsumerWrapper pullConsumerWrapper) {
RocketMqConsumerCache.updatePullConsumerCache(pullConsumerWrapper);
}

/**
* 获取订阅topics的pushConsumer列表
*
* @param topics 订阅主题Set
* @return pushConsumer列表
*/
public static Set<DefaultMqPushConsumerWrapper> getPushConsumersSubscribed2Topics(Set<String> topics) {
return RocketMqConsumerCache.getPushConsumersSubscribed2Topics(topics);
}

/**
* 获取未订阅topics的pushConsumer列表
*
* @param topics 订阅主题Set
* @return pushConsumer列表
*/
public static Set<DefaultMqPushConsumerWrapper> getPushConsumersUnsubscribed2Topics(Set<String> topics) {
return RocketMqConsumerCache.getPushConsumersUnsubscribed2Topics(topics);
}

/**
* 获取订阅topics的pullConsumer列表
*
* @param topics 订阅主题Set
* @return pullConsumer列表
*/
public static Set<DefaultLitePullConsumerWrapper> getPullConsumersSubscribed2Topics(Set<String> topics) {
return RocketMqConsumerCache.getPullConsumersSubscribed2Topics(topics);
}

/**
* 获取未订阅topics的pullConsumer列表
*
* @param topics 订阅主题Set
* @return pullConsumer列表
*/
public static Set<DefaultLitePullConsumerWrapper> getPullConsumersUnsubscribed2Topics(Set<String> topics) {
return RocketMqConsumerCache.getPullConsumersUnsubscribed2Topics(topics);
}

private static void suspendPushConsumer(Set<String> topics) {
Set<DefaultMqPushConsumerWrapper> pushConsumers = getPushConsumersSubscribed2Topics(topics);
for (DefaultMqPushConsumerWrapper pushConsumer : pushConsumers) {
pushConsumer.suspendConsumer();
}
}

private static void suspendPullConsumer(Set<String> topics) {
Set<DefaultLitePullConsumerWrapper> pullConsumers = getPullConsumersSubscribed2Topics(topics);
for (DefaultLitePullConsumerWrapper pullConsumer : pullConsumers) {
pullConsumer.suspendConsumer();
}
}

private static void resumePushConsumer(Set<String> topics) {
Set<DefaultMqPushConsumerWrapper> pushConsumers = getPushConsumersUnsubscribed2Topics(topics);
for (DefaultMqPushConsumerWrapper pushConsumer : pushConsumers) {
pushConsumer.resumeConsumer();
}
}

private static void resumePullConsumer(Set<String> topics) {
Set<DefaultLitePullConsumerWrapper> pullConsumers = getPullConsumersUnsubscribed2Topics(topics);
for (DefaultLitePullConsumerWrapper pullConsumer : pullConsumers) {
pullConsumer.resumeConsumer();
}
}
}
Loading

0 comments on commit aad721d

Please sign in to comment.