Skip to content

Commit

Permalink
rocketmq禁消费controller-wrapper、handler
Browse files Browse the repository at this point in the history
Signed-off-by: daizhenyu <1449308021@qq.com>
  • Loading branch information
daizhenyu committed Dec 18, 2023
1 parent f75dd1e commit 719e2ee
Show file tree
Hide file tree
Showing 12 changed files with 806 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,24 +10,27 @@
<modelVersion>4.0.0</modelVersion>

<artifactId>consumer-controller</artifactId>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
</properties>

<dependencies>
<dependency>
<groupId>com.huaweicloud.sermant</groupId>
<artifactId>sermant-agentcore-god</artifactId>
<version>1.0.0</version>
<scope>compile</scope>
<artifactId>sermant-agentcore-core</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>1.1.0</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<scope>provided</scope>
</dependency>
</dependencies>

<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
</properties>

</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Copyright (C) 2023-2023 Huawei Technologies Co., Ltd. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.huaweicloud.sermant.rocketmq.cache;

import com.huaweicloud.sermant.rocketmq.wrapper.DefaultLitePullConsumerWrapper;
import com.huaweicloud.sermant.rocketmq.wrapper.DefaultMqPushConsumerWrapper;

import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;

/**
* rocketmq消费者缓存
*
* @author daizhenyu
* @since 2023-12-04
**/
public class RocketMqConsumerCache {
/**
* push消费者wrapper缓存
*/
public static final Set<DefaultMqPushConsumerWrapper> PUSH_CONSUMERS_CACHE =
new CopyOnWriteArraySet<>();

/**
* pull消费者wrapper缓存
*/
public static final Set<DefaultLitePullConsumerWrapper> PULL_CONSUMERS_CACHE =
new CopyOnWriteArraySet<>();

private RocketMqConsumerCache() {
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Copyright (C) 2023-2023 Huawei Technologies Co., Ltd. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.huaweicloud.sermant.rocketmq.extension;

import com.huaweicloud.sermant.core.plugin.agent.entity.ExecuteContext;

/**
* RocketMqConsumerAssign处理器接口,供外部实现在rocketmq消费者指定队列消费时执行相应操作
*
* @author daizhenyu
* @since 2023-12-15
**/
public interface RocketMqConsumerAssignHandler {
/**
* 拦截点前置处理
*
* @param context 上下文信息
*/
void doBefore(ExecuteContext context);

/**
* 拦截点后置处理
*
* @param context 上下文信息
*/
void doAfter(ExecuteContext context);

/**
* 拦截点异常处理
*
* @param context 上下文信息
*/
void doOnThrow(ExecuteContext context);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Copyright (C) 2023-2023 Huawei Technologies Co., Ltd. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.huaweicloud.sermant.rocketmq.extension;

import com.huaweicloud.sermant.core.plugin.agent.entity.ExecuteContext;

/**
* RocketmqConsumerShutdown处理器接口,供外部实现在rocketmq消费者关闭时执行相应操作
*
* @author daizhenyu
* @since 2023-12-13
**/
public interface RocketMqConsumerShutdownHandler {
/**
* 拦截点前置处理
*
* @param context 上下文信息
*/
void doBefore(ExecuteContext context);

/**
* 拦截点后置处理
*
* @param context 上下文信息
*/
void doAfter(ExecuteContext context);

/**
* 拦截点异常处理
*
* @param context 上下文信息
*/
void doOnThrow(ExecuteContext context);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Copyright (C) 2023-2023 Huawei Technologies Co., Ltd. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.huaweicloud.sermant.rocketmq.extension;

import com.huaweicloud.sermant.core.plugin.agent.entity.ExecuteContext;

/**
* RocketmqConsumerStart处理器接口,供外部实现在rocketmq消费者启动时执行相应操作
*
* @author daizhenyu
* @since 2023-12-13
**/
public interface RocketMqConsumerStartHandler {
/**
* 拦截点前置处理
*
* @param context 上下文信息
*/
void doBefore(ExecuteContext context);

/**
* 拦截点后置处理
*
* @param context 上下文信息
*/
void doAfter(ExecuteContext context);

/**
* 拦截点异常处理
*
* @param context 上下文信息
*/
void doOnThrow(ExecuteContext context);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Copyright (C) 2023-2023 Huawei Technologies Co., Ltd. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.huaweicloud.sermant.rocketmq.extension;

import com.huaweicloud.sermant.core.plugin.agent.entity.ExecuteContext;

/**
* RocketmqConsumerSubscribe处理器接口,供外部实现在rocketmq消费者订阅时执行相应操作
*
* @author daizhenyu
* @since 2023-12-15
**/
public interface RocketMqConsumerSubscribeHandler {
/**
* 拦截点前置处理
*
* @param context 上下文信息
*/
void doBefore(ExecuteContext context);

/**
* 拦截点后置处理
*
* @param context 上下文信息
*/
void doAfter(ExecuteContext context);

/**
* 拦截点异常处理
*
* @param context 上下文信息
*/
void doOnThrow(ExecuteContext context);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Copyright (C) 2023-2023 Huawei Technologies Co., Ltd. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.huaweicloud.sermant.rocketmq.extension;

import com.huaweicloud.sermant.core.plugin.agent.entity.ExecuteContext;

/**
* RocketmqConsumerUnsubscribe处理器接口,供外部实现在rocketmq消费者取消订阅时执行相应操作
*
* @author daizhenyu
* @since 2023-12-15
**/
public interface RocketMqConsumerUnsubscribeHandler {
/**
* 拦截点前置处理
*
* @param context 上下文信息
*/
void doBefore(ExecuteContext context);

/**
* 拦截点后置处理
*
* @param context 上下文信息
*/
void doAfter(ExecuteContext context);

/**
* 拦截点异常处理
*
* @param context 上下文信息
*/
void doOnThrow(ExecuteContext context);
}
Loading

0 comments on commit 719e2ee

Please sign in to comment.