Skip to content

Commit

Permalink
消息队列禁消费插件: kafka禁止消费控制器模块
Browse files Browse the repository at this point in the history
Signed-off-by: lilai <lilai23@foxmail.com>
  • Loading branch information
lilai23 committed Dec 18, 2023
1 parent f75dd1e commit d69e14b
Show file tree
Hide file tree
Showing 13 changed files with 426 additions and 51 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import com.huaweicloud.sermant.core.common.LoggerFactory;
import com.huaweicloud.sermant.core.exception.NetInterfacesCheckException;

import java.net.Inet4Address;
import java.net.InetAddress;
import java.net.NetworkInterface;
import java.net.SocketException;
Expand All @@ -43,6 +44,8 @@ public class NetworkUtils {

private static final String LOCAL_HOST_IP = "127.0.0.1";

private static final String EMPTY_STR = "";

private NetworkUtils() {
}

Expand Down Expand Up @@ -100,6 +103,47 @@ public static Optional<String> getHostName() {
} catch (UnknownHostException e) {
return Optional.empty();
}
}

/**
* 获取Linux下的IP地址
*
* @return IP地址
*/
public static String getMachineIp() {
try {
for (Enumeration<NetworkInterface> networkInterfaceEnumeration = NetworkInterface.getNetworkInterfaces();
networkInterfaceEnumeration.hasMoreElements(); ) {
NetworkInterface networkInterface = networkInterfaceEnumeration.nextElement();
String name = networkInterface.getName();
if (name.contains("docker") || name.contains("lo")) {
continue;
}
String ip = resolveNetworkIp(networkInterface);
if (!EMPTY_STR.equals(ip)) {
return ip;
}
}
} catch (SocketException exception) {
LOGGER.warning("An exception occurred while getting the machine's IP address.");
}
LOGGER.severe("Can not acquire correct instance ip , it will be replaced by local ip!");
return LOCAL_HOST_IP;
}

private static String resolveNetworkIp(NetworkInterface networkInterface) {
for (Enumeration<InetAddress> enumIpAddr = networkInterface.getInetAddresses();
enumIpAddr.hasMoreElements(); ) {
InetAddress inetAddress = enumIpAddr.nextElement();
if (!(inetAddress instanceof Inet4Address) || inetAddress.isLoopbackAddress()) {
continue;
}
String ipaddress = inetAddress.getHostAddress();
if (!EMPTY_STR.equals(ipaddress) && !LOCAL_HOST_IP.equals(ipaddress)) {
// 取第一个符合要求的IP
return ipaddress;
}
}
return EMPTY_STR;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -91,12 +91,12 @@ public void process(DynamicConfigEvent event) {
private void processCreateOrUpdateEvent(DynamicConfigEvent event) {
if (GLOBAL_CONFIG_KEY.equals(event.getKey())) {
ProhibitionConfigManager.updateGlobalConfig(yaml.loadAs(event.getContent(), ProhibitionConfig.class));
executeProhibition();
markProhibition();
}
if ((LOCAL_CONFIG_KEY_PREFIX + ConfigManager.getConfig(ServiceMeta.class).getService()).equals(
event.getKey())) {
ProhibitionConfigManager.updateLocalConfig(yaml.loadAs(event.getContent(), ProhibitionConfig.class));
executeProhibition();
markProhibition();
}
LOGGER.info(String.format(Locale.ROOT, "Update mq-consume-prohibition config, current config: %s",
ProhibitionConfigManager.printConfig()));
Expand All @@ -110,21 +110,20 @@ private void processCreateOrUpdateEvent(DynamicConfigEvent event) {
private void processDeleteEvent(DynamicConfigEvent event) {
if (GLOBAL_CONFIG_KEY.equals(event.getKey())) {
ProhibitionConfigManager.updateGlobalConfig(new ProhibitionConfig());
executeProhibition();
markProhibition();
}
if ((LOCAL_CONFIG_KEY_PREFIX + ConfigManager.getConfig(ServiceMeta.class).getService()).equals(
event.getKey())) {
ProhibitionConfigManager.updateLocalConfig(new ProhibitionConfig());
executeProhibition();
markProhibition();
}
LOGGER.info(String.format(Locale.ROOT, "Delete mq-consume-prohibition config, current config: %s",
ProhibitionConfigManager.printConfig()));
}

private void executeProhibition() {
KafkaConsumerController.getConsumerCache()
.forEach(obj -> KafkaConsumerController.disableConsumption(obj,
ProhibitionConfigManager.getKafkaProhibitionTopics()));
private void markProhibition() {
KafkaConsumerController.getKafkaConsumerCache().values()
.forEach(obj -> obj.getIsNeedExecute().set(true));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,24 +10,31 @@
<modelVersion>4.0.0</modelVersion>

<artifactId>consumer-controller</artifactId>

<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<kafka.client.version>2.7.0</kafka.client.version>
<collections4.version>4.4</collections4.version>
</properties>

<dependencies>
<dependency>
<groupId>com.huaweicloud.sermant</groupId>
<artifactId>sermant-agentcore-god</artifactId>
<version>1.0.0</version>
<scope>compile</scope>
<artifactId>sermant-agentcore-core</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>1.1.0</version>
<version>${kafka.client.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-collections4</artifactId>
<version>${collections4.version}</version>
</dependency>
</dependencies>

<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
</properties>

</project>
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,16 @@

package com.huaweicloud.sermant.kafka.cache;

import com.huaweicloud.sermant.core.config.ConfigManager;
import com.huaweicloud.sermant.core.plugin.config.ServiceMeta;
import com.huaweicloud.sermant.core.utils.NetworkUtils;

import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.HashSet;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;

/**
* KafkaConsumer缓存
Expand All @@ -36,22 +42,17 @@ public enum KafkaConsumerCache {
/**
* 消费者缓存
*/
private final Set<KafkaConsumerWrapper> kafkaConsumerCache = new CopyOnWriteArraySet<>();
private final Map<Integer, KafkaConsumerWrapper> kafkaConsumerCache = new ConcurrentHashMap<>();

KafkaConsumerCache() {
init();
}

private void init() {

}

/**
* 获取消费者缓存
*
* @return 消费者缓存
*/
public Set<KafkaConsumerWrapper> getCache() {
public Map<Integer, KafkaConsumerWrapper> getCache() {
return kafkaConsumerCache;
}

Expand All @@ -60,8 +61,8 @@ public Set<KafkaConsumerWrapper> getCache() {
*
* @param kafkaConsumer 消费者实例
*/
public void updateCache(KafkaConsumer<?, ?> kafkaConsumer) {
kafkaConsumerCache.add(convert(kafkaConsumer));
public void addKafkaConsumer(KafkaConsumer<?, ?> kafkaConsumer) {
kafkaConsumerCache.put(kafkaConsumer.hashCode(), convert(kafkaConsumer));
}

/**
Expand All @@ -71,6 +72,19 @@ public void updateCache(KafkaConsumer<?, ?> kafkaConsumer) {
* @return 消费者包装实例
*/
private KafkaConsumerWrapper convert(KafkaConsumer<?, ?> kafkaConsumer) {
return new KafkaConsumerWrapper();
KafkaConsumerWrapper wrapper = new KafkaConsumerWrapper();
ServiceMeta serviceMeta = ConfigManager.getConfig(ServiceMeta.class);
wrapper.setKafkaConsumer(kafkaConsumer);
wrapper.setZone(serviceMeta.getZone());
wrapper.setProject(serviceMeta.getProject());
wrapper.setEnvironment(serviceMeta.getEnvironment());
wrapper.setApplication(serviceMeta.getApplication());
wrapper.setService(serviceMeta.getService());
wrapper.setServerAddress(NetworkUtils.getMachineIp());
wrapper.setOriginalTopics(new HashSet<>());
wrapper.setOriginalPartitions(new HashSet<>());
wrapper.setAssign(false);
wrapper.setNeedExecute(new AtomicBoolean(false));
return wrapper;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,157 @@

package com.huaweicloud.sermant.kafka.cache;

import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;

import java.util.Collection;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;

/**
* Kafka实例包装类
*
* @author lilai
* @since 2023-12-05
*/
public class KafkaConsumerWrapper {
private KafkaConsumer<?, ?> kafkaConsumer;

/**
* 宿主应用自身订阅的Topic
*/
private Set<String> originalTopics;

/**
* 是否使用assign方法指定订阅
*/
private boolean isAssign;

/**
* 使用assign方法指定的Topic和分区
*/
private Collection<TopicPartition> originalPartitions;

/**
* 是否需要在poll前处理执行禁止消费
*/
private AtomicBoolean isNeedExecute;

/**
* 当前消费者的服务所在可用区
*/
private String zone;

/**
* 当前消费者的服务所在可用区命名空间
*/
private String project;

/**
* 当前消费者的服务所在的环境
*/
private String environment;

/**
* 当前消费者的服务所在的应用
*/
private String application;

/**
* 当前消费者的所在服务的名称
*/
private String service;

/**
* 当前消费者的所在服务的IP
*/
private String serverAddress;

public KafkaConsumer<?, ?> getKafkaConsumer() {
return kafkaConsumer;
}

public void setKafkaConsumer(KafkaConsumer<?, ?> kafkaConsumer) {
this.kafkaConsumer = kafkaConsumer;
}

public Set<String> getOriginalTopics() {
return originalTopics;
}

public void setOriginalTopics(Set<String> originalTopics) {
this.originalTopics = originalTopics;
}

public String getZone() {
return zone;
}

public void setZone(String zone) {
this.zone = zone;
}

public String getProject() {
return project;
}

public void setProject(String project) {
this.project = project;
}

public String getEnvironment() {
return environment;
}

public void setEnvironment(String environment) {
this.environment = environment;
}

public String getApplication() {
return application;
}

public void setApplication(String application) {
this.application = application;
}

public String getService() {
return service;
}

public void setService(String service) {
this.service = service;
}

public String getServerAddress() {
return serverAddress;
}

public void setServerAddress(String serverAddress) {
this.serverAddress = serverAddress;
}

public boolean isAssign() {
return isAssign;
}

public void setAssign(boolean assign) {
this.isAssign = assign;
}

public Collection<TopicPartition> getOriginalPartitions() {
return originalPartitions;
}

public void setOriginalPartitions(Collection<TopicPartition> originalPartitions) {
this.originalPartitions = originalPartitions;
}

public AtomicBoolean getIsNeedExecute() {
return isNeedExecute;
}

public void setNeedExecute(AtomicBoolean needExecute) {
this.isNeedExecute = needExecute;
}
}
Loading

0 comments on commit d69e14b

Please sign in to comment.