Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rocketmq consumption prohibition plugin: controller #1384

Merged
merged 1 commit into from
Dec 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -10,24 +10,27 @@
<modelVersion>4.0.0</modelVersion>

<artifactId>consumer-controller</artifactId>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
</properties>

<dependencies>
<dependency>
<groupId>com.huaweicloud.sermant</groupId>
<artifactId>sermant-agentcore-god</artifactId>
<version>1.0.0</version>
<scope>compile</scope>
<artifactId>sermant-agentcore-core</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>1.1.0</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<scope>provided</scope>
</dependency>
</dependencies>

<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
</properties>

</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Copyright (C) 2023-2023 Huawei Technologies Co., Ltd. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.huaweicloud.sermant.rocketmq.cache;

import com.huaweicloud.sermant.rocketmq.wrapper.DefaultLitePullConsumerWrapper;
import com.huaweicloud.sermant.rocketmq.wrapper.DefaultMqPushConsumerWrapper;

import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;

/**
* rocketmq消费者缓存
*
* @author daizhenyu
* @since 2023-12-04
**/
public class RocketMqConsumerCache {
/**
* push消费者wrapper缓存
*/
public static final Set<DefaultMqPushConsumerWrapper> PUSH_CONSUMERS_CACHE =
new CopyOnWriteArraySet<>();

/**
* pull消费者wrapper缓存
*/
public static final Set<DefaultLitePullConsumerWrapper> PULL_CONSUMERS_CACHE =
new CopyOnWriteArraySet<>();

private RocketMqConsumerCache() {
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
* Copyright (C) 2023-2023 Huawei Technologies Co., Ltd. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.huaweicloud.sermant.rocketmq.constant;

/**
* rocketmq pull消费者订阅方式枚举类
*
* @author daizhenyu
* @since 2023-12-05
**/
public enum SubscriptionType {
/**
* 订阅方式为NONE
*/
NONE("NONE"),
luanwenfei-venus marked this conversation as resolved.
Show resolved Hide resolved
/**
* 通过订阅topic进行消费
*/
SUBSCRIBE("SUBSCRIBE"),
/**
* 通过指定队列进行消费
*/
ASSIGN("ASSIGN");

private final String subscriptionName;

SubscriptionType(String subscriptionName) {
this.subscriptionName = subscriptionName;
}

public String getSubscriptionTypeName() {
return this.subscriptionName;
}

/**
* 用于根据订阅方式名称获取订阅枚举对象
*
* @param name 订阅方式名称
* @return SubscriptionType
*/
public static SubscriptionType getSubscriptionTypeByName(String name) {
for (SubscriptionType value : values()) {
if (value.getSubscriptionTypeName().equals(name)) {
return value;
}
}
return NONE;
}
luanwenfei-venus marked this conversation as resolved.
Show resolved Hide resolved
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Copyright (C) 2023-2023 Huawei Technologies Co., Ltd. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.huaweicloud.sermant.rocketmq.extension;

import com.huaweicloud.sermant.core.plugin.agent.entity.ExecuteContext;

/**
* RocketmqConsumer处理器接口,供外部实现在rocketmq消费者拦截点执行扩展操作
*
* @author daizhenyu
* @since 2023-12-13
**/
public interface RocketMqConsumerHandler {
/**
* 拦截点前置处理
*
* @param context 上下文信息
*/
void doBefore(ExecuteContext context);

/**
* 拦截点后置处理
*
* @param context 上下文信息
*/
void doAfter(ExecuteContext context);

/**
* 拦截点异常处理
*
* @param context 上下文信息
*/
void doOnThrow(ExecuteContext context);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,198 @@
/*
* Copyright (C) 2023-2023 Huawei Technologies Co., Ltd. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.huaweicloud.sermant.rocketmq.wrapper;

import org.apache.rocketmq.client.impl.factory.MQClientInstance;

import java.util.HashSet;
import java.util.Set;

/**
* 消费者包装抽象类
*
* @author daizhenyu
* @since 2023-12-04
**/
public abstract class AbstractConsumerWrapper {
lilai23 marked this conversation as resolved.
Show resolved Hide resolved
/**
* rocketmq消费者的私有属性,用于加入或退出消费者组
*/
protected final MQClientInstance clientFactory;

/**
* 消费者是否已经禁止消费
*/
protected boolean pause = false;

/**
* nameserver地址
*/
protected String nameServerAddress;

/**
* rocketmq消费者组
*/
protected String consumerGroup;

/**
* 消费者实例ip
*/
protected String clientIp;

/**
* 消费者实例名称
*/
protected String instanceName;

/**
* 当前消费者的服务所在可用区
*/
protected String zone;

/**
* 当前消费者的服务所在可用区命名空间
*/
protected String project;

/**
* 当前消费者的服务所在环境
*/
protected String environment;

/**
* 当前消费者的服务所在应用
*/
protected String application;

/**
* 当前消费者所在服务的名称
*/
protected String service;

/**
* 消费者已订阅消费主题
*/
protected Set<String> subscribedTopics = new HashSet<>();

/**
* 有参构造方法
*
* @param clientFactory 消费者内部工厂类
*/
protected AbstractConsumerWrapper(MQClientInstance clientFactory) {
this.clientFactory = clientFactory;
}

/**
* 初始化消费者实例的信息
*/
protected abstract void initClientInfo();

public boolean isPause() {
return pause;
}

public void setPause(boolean pause) {
this.pause = pause;
}

public MQClientInstance getClientFactory() {
return clientFactory;
}

public String getNameServerAddress() {
return nameServerAddress;
}

public String getConsumerGroup() {
return consumerGroup;
}

public String getClientIp() {
return clientIp;
}

public String getInstanceName() {
return instanceName;
}

public String getZone() {
return zone;
}

public void setZone(String zone) {
this.zone = zone;
}

public String getProject() {
return project;
}

public void setProject(String project) {
this.project = project;
}

public String getEnvironment() {
return environment;
}

public void setEnvironment(String environment) {
this.environment = environment;
}

public String getApplication() {
return application;
}

public void setApplication(String application) {
this.application = application;
}

public String getService() {
return service;
}

public void setService(String service) {
this.service = service;
}

public Set<String> getSubscribedTopics() {
return subscribedTopics;
}

public void setSubscribedTopics(Set<String> subscribedTopics) {
this.subscribedTopics = subscribedTopics;
}

/**
* 添加订阅的topic
*
* @param topic 订阅的主题
*/
public void addSubscribedTopics(String topic) {
this.subscribedTopics.add(topic);
}

/**
* 移除取消订阅的topic
*
* @param topic 取消订阅的主题
*/
public void removeSubscribedTopics(String topic) {
this.subscribedTopics.remove(topic);
}
}
Loading