Skip to content

Commit

Permalink
消息队列禁消费插件: kafka禁止消费控制器模块和字节码增强模块
Browse files Browse the repository at this point in the history
Signed-off-by: lilai <lilai23@foxmail.com>
  • Loading branch information
lilai23 committed Dec 18, 2023
1 parent f75dd1e commit 059b0a6
Show file tree
Hide file tree
Showing 27 changed files with 1,467 additions and 148 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import com.huaweicloud.sermant.core.common.LoggerFactory;
import com.huaweicloud.sermant.core.exception.NetInterfacesCheckException;

import java.net.Inet4Address;
import java.net.InetAddress;
import java.net.NetworkInterface;
import java.net.SocketException;
Expand All @@ -43,6 +44,8 @@ public class NetworkUtils {

private static final String LOCAL_HOST_IP = "127.0.0.1";

private static final String EMPTY_STR = "";

private NetworkUtils() {
}

Expand Down Expand Up @@ -100,6 +103,47 @@ public static Optional<String> getHostName() {
} catch (UnknownHostException e) {
return Optional.empty();
}
}

/**
* 获取Linux下的IP地址
*
* @return IP地址
*/
public static String getMachineIp() {
try {
for (Enumeration<NetworkInterface> networkInterfaceEnumeration = NetworkInterface.getNetworkInterfaces();
networkInterfaceEnumeration.hasMoreElements(); ) {
NetworkInterface networkInterface = networkInterfaceEnumeration.nextElement();
String name = networkInterface.getName();
if (name.contains("docker") || name.contains("lo")) {
continue;
}
String ip = resolveNetworkIp(networkInterface);
if (!EMPTY_STR.equals(ip)) {
return ip;
}
}
} catch (SocketException exception) {
LOGGER.warning("An exception occurred while getting the machine's IP address.");
}
LOGGER.severe("Can not acquire correct instance ip , it will be replaced by local ip!");
return LOCAL_HOST_IP;
}

private static String resolveNetworkIp(NetworkInterface networkInterface) {
for (Enumeration<InetAddress> enumIpAddr = networkInterface.getInetAddresses();
enumIpAddr.hasMoreElements(); ) {
InetAddress inetAddress = enumIpAddr.nextElement();
if (!(inetAddress instanceof Inet4Address) || inetAddress.isLoopbackAddress()) {
continue;
}
String ipaddress = inetAddress.getHostAddress();
if (!EMPTY_STR.equals(ipaddress) && !LOCAL_HOST_IP.equals(ipaddress)) {
// 取第一个符合要求的IP
return ipaddress;
}
}
return EMPTY_STR;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ private void processDeleteEvent(DynamicConfigEvent event) {
}

private void executeProhibition() {
KafkaConsumerController.getConsumerCache()
KafkaConsumerController.getKafkaConsumerCache()
.forEach(obj -> KafkaConsumerController.disableConsumption(obj,
ProhibitionConfigManager.getKafkaProhibitionTopics()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,24 +10,31 @@
<modelVersion>4.0.0</modelVersion>

<artifactId>consumer-controller</artifactId>

<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<kafka.client.version>2.7.0</kafka.client.version>
<collections4.version>4.4</collections4.version>
</properties>

<dependencies>
<dependency>
<groupId>com.huaweicloud.sermant</groupId>
<artifactId>sermant-agentcore-god</artifactId>
<version>1.0.0</version>
<scope>compile</scope>
<artifactId>sermant-agentcore-core</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>1.1.0</version>
<version>${kafka.client.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-collections4</artifactId>
<version>${collections4.version}</version>
</dependency>
</dependencies>

<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
</properties>

</project>
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,13 @@

package com.huaweicloud.sermant.kafka.cache;

import com.huaweicloud.sermant.core.config.ConfigManager;
import com.huaweicloud.sermant.core.plugin.config.ServiceMeta;
import com.huaweicloud.sermant.core.utils.NetworkUtils;

import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;

Expand All @@ -39,11 +44,6 @@ public enum KafkaConsumerCache {
private final Set<KafkaConsumerWrapper> kafkaConsumerCache = new CopyOnWriteArraySet<>();

KafkaConsumerCache() {
init();
}

private void init() {

}

/**
Expand All @@ -60,7 +60,7 @@ public Set<KafkaConsumerWrapper> getCache() {
*
* @param kafkaConsumer 消费者实例
*/
public void updateCache(KafkaConsumer<?, ?> kafkaConsumer) {
public void addKafkaConsumer(KafkaConsumer<?, ?> kafkaConsumer) {
kafkaConsumerCache.add(convert(kafkaConsumer));
}

Expand All @@ -71,6 +71,18 @@ public void updateCache(KafkaConsumer<?, ?> kafkaConsumer) {
* @return 消费者包装实例
*/
private KafkaConsumerWrapper convert(KafkaConsumer<?, ?> kafkaConsumer) {
return new KafkaConsumerWrapper();
KafkaConsumerWrapper wrapper = new KafkaConsumerWrapper();
ServiceMeta serviceMeta = ConfigManager.getConfig(ServiceMeta.class);
wrapper.setKafkaConsumer(kafkaConsumer);
wrapper.setZone(serviceMeta.getZone());
wrapper.setProject(serviceMeta.getProject());
wrapper.setEnvironment(serviceMeta.getEnvironment());
wrapper.setApplication(serviceMeta.getApplication());
wrapper.setService(serviceMeta.getService());
wrapper.setServerAddress(NetworkUtils.getMachineIp());
wrapper.setOriginalTopics(new HashSet<>());
wrapper.setOriginalPartitions(new HashSet<>());
wrapper.setAssign(false);
return wrapper;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,143 @@

package com.huaweicloud.sermant.kafka.cache;

import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;

import java.util.Collection;
import java.util.Set;

/**
* Kafka实例包装类
*
* @author lilai
* @since 2023-12-05
*/
public class KafkaConsumerWrapper {
private KafkaConsumer<?, ?> kafkaConsumer;

/**
* 宿主应用自身订阅的Topic
*/
private Set<String> originalTopics;

/**
* 是否使用assign方法指定订阅
*/
private boolean isAssign;

/**
* 使用assign方法指定的Topic和分区
*/
private Collection<TopicPartition> originalPartitions;

/**
* 当前消费者的服务所在可用区
*/
private String zone;

/**
* 当前消费者的服务所在可用区命名空间
*/
private String project;

/**
* 当前消费者的服务所在的环境
*/
private String environment;

/**
* 当前消费者的服务所在的应用
*/
private String application;

/**
* 当前消费者的所在服务的名称
*/
private String service;

/**
* 当前消费者的所在服务的IP
*/
private String serverAddress;

public KafkaConsumer<?, ?> getKafkaConsumer() {
return kafkaConsumer;
}

public void setKafkaConsumer(KafkaConsumer<?, ?> kafkaConsumer) {
this.kafkaConsumer = kafkaConsumer;
}

public Set<String> getOriginalTopics() {
return originalTopics;
}

public void setOriginalTopics(Set<String> originalTopics) {
this.originalTopics = originalTopics;
}

public String getZone() {
return zone;
}

public void setZone(String zone) {
this.zone = zone;
}

public String getProject() {
return project;
}

public void setProject(String project) {
this.project = project;
}

public String getEnvironment() {
return environment;
}

public void setEnvironment(String environment) {
this.environment = environment;
}

public String getApplication() {
return application;
}

public void setApplication(String application) {
this.application = application;
}

public String getService() {
return service;
}

public void setService(String service) {
this.service = service;
}

public String getServerAddress() {
return serverAddress;
}

public void setServerAddress(String serverAddress) {
this.serverAddress = serverAddress;
}

public boolean isAssign() {
return isAssign;
}

public void setAssign(boolean assign) {
this.isAssign = assign;
}

public Collection<TopicPartition> getOriginalPartitions() {
return originalPartitions;
}

public void setOriginalPartitions(Collection<TopicPartition> originalPartitions) {
this.originalPartitions = originalPartitions;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,13 @@
import com.huaweicloud.sermant.kafka.cache.KafkaConsumerCache;
import com.huaweicloud.sermant.kafka.cache.KafkaConsumerWrapper;

import org.apache.commons.collections4.CollectionUtils;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;

import java.util.Collection;
import java.util.Set;
import java.util.stream.Collectors;

/**
* KafkaConsumer消费控制器
Expand All @@ -34,38 +38,53 @@ private KafkaConsumerController() {
}

/**
* 关闭消费
* 执行禁止消费
*
* @param kafkaConsumerWrapper 消费者包装实例
* @param topics 消费主题
* @param prohibitionTopics 消费主题
*/
public static void disableConsumption(KafkaConsumerWrapper kafkaConsumerWrapper, Set<String> topics) {
}
public static void disableConsumption(KafkaConsumerWrapper kafkaConsumerWrapper, Set<String> prohibitionTopics) {
Set<String> originalTopics = kafkaConsumerWrapper.getOriginalTopics();
Collection<TopicPartition> originalPartitions = kafkaConsumerWrapper.getOriginalPartitions();

/**
* 开启消费
*
* @param kafkaConsumerWrapper 消费者包装实例
* @param topics 消费主题
*/
public static void enableConsumption(KafkaConsumerWrapper kafkaConsumerWrapper, Set<String> topics) {
// 未订阅任何Topic,无需操作
if (originalTopics.size() == 0) {
return;
}

KafkaConsumer<?, ?> kafkaConsumer = kafkaConsumerWrapper.getKafkaConsumer();
Collection<String> subtractTopics = CollectionUtils.subtract(originalTopics, prohibitionTopics);
if (kafkaConsumerWrapper.isAssign()) {
kafkaConsumer.assign(originalPartitions.stream().filter(obj -> subtractTopics.contains(obj.topic()))
.collect(Collectors.toSet()));
}
kafkaConsumer.subscribe(subtractTopics);
}

/**
* 更新消费者缓存
* 新增消费者缓存
*
* @param kafkaConsumer 消费者实例
*/
public static void updateConsumerCache(KafkaConsumer<?, ?> kafkaConsumer) {
KafkaConsumerCache.INSTANCE.updateCache(kafkaConsumer);
public static void addKafkaConsumerCache(KafkaConsumer<?, ?> kafkaConsumer) {
KafkaConsumerCache.INSTANCE.addKafkaConsumer(kafkaConsumer);
}

/**
* 获取消费者缓存
*
* @return 消费者缓存
*/
public static Set<KafkaConsumerWrapper> getConsumerCache() {
public static Set<KafkaConsumerWrapper> getKafkaConsumerCache() {
return KafkaConsumerCache.INSTANCE.getCache();
}

/**
* 移除消费者缓存
*
* @param kafkaConsumerWrapper 消费者包装实例
*/
public static void removeKafkaConsumeCache(KafkaConsumerWrapper kafkaConsumerWrapper) {
KafkaConsumerCache.INSTANCE.getCache().remove(kafkaConsumerWrapper);
}
}
Loading

0 comments on commit 059b0a6

Please sign in to comment.