Skip to content

Commit

Permalink
【fix】优化springcloud双注册场景,DiscoveryClientInterceptor/DiscoveryClientSer…
Browse files Browse the repository at this point in the history
…viceInterceptor的处理逻辑
  • Loading branch information
TangLeDaily committed Aug 2, 2023
1 parent 4427972 commit 65ef1ab
Show file tree
Hide file tree
Showing 6 changed files with 111 additions and 113 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,18 +26,15 @@
import com.huaweicloud.sermant.core.common.LoggerFactory;
import com.huaweicloud.sermant.core.plugin.agent.entity.ExecuteContext;
import com.huaweicloud.sermant.core.plugin.service.PluginServiceManager;
import com.huaweicloud.sermant.core.service.ServiceManager;

import reactor.core.publisher.Flux;

import org.springframework.cloud.client.ServiceInstance;
import org.springframework.cloud.client.discovery.ReactiveDiscoveryClient;
import org.springframework.cloud.client.discovery.composite.CompositeDiscoveryClient;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Locale;
import java.util.Objects;
import java.util.logging.Logger;
import java.util.stream.Collectors;
Expand All @@ -53,37 +50,41 @@ public class DiscoveryClientInterceptor extends InstanceInterceptorSupport {

@Override
public ExecuteContext doBefore(ExecuteContext context) {
if (isMarked()) {
return context;
}
try {
mark();
String serviceId = (String) context.getArguments()[0];
final RegisterCenterService service = PluginServiceManager.getPluginService(RegisterCenterService.class);
final List<MicroServiceInstance> microServiceInstances = service.getServerList(serviceId);
if (!RegisterContext.INSTANCE.isAvailable()
|| RegisterDynamicConfig.INSTANCE.isNeedCloseOriginRegisterCenter()) {
final Object target = context.getObject();
if (!microServiceInstances.isEmpty()) {
context.skip(isWebfLux(target) ? convertAndMergeWithFlux(microServiceInstances, serviceId, target)
: convertAndMerge(microServiceInstances, serviceId, target));
}
} finally {
unMark();
context.skip(isWebfLux(target) ? Flux.fromIterable(Collections.emptyList())
: Collections.emptyList());
}
return context;
}

@Override
public ExecuteContext doAfter(ExecuteContext context) {
String serviceId = (String) context.getArguments()[0];
final RegisterCenterService service = PluginServiceManager.getPluginService(RegisterCenterService.class);
final List<MicroServiceInstance> microServiceInstances = service.getServerList(serviceId);
final Object target = context.getObject();
final Object contextResult = context.getResult();
if (!microServiceInstances.isEmpty()) {
context.changeResult(
isWebfLux(target) ? convertAndMergeWithFlux(microServiceInstances, serviceId, target, contextResult)
: convertAndMerge(microServiceInstances, serviceId, target, contextResult));
}
return context;
}

private Flux<ServiceInstance> convertAndMergeWithFlux(List<MicroServiceInstance> microServiceInstances,
String serviceId, Object target) {
return Flux.fromIterable(convertAndMerge(microServiceInstances, serviceId, target));
private Flux<ServiceInstance> convertAndMergeWithFlux(
List<MicroServiceInstance> microServiceInstances,
String serviceId, Object target, Object contextResult) {
return Flux.fromIterable(convertAndMerge(microServiceInstances, serviceId, target, contextResult));
}

private List<ServiceInstance> convertAndMerge(List<MicroServiceInstance> microServiceInstances, String serviceId,
Object target) {
private List<ServiceInstance> convertAndMerge(List<MicroServiceInstance> microServiceInstances,
String serviceId,
Object target, Object contextResult) {
List<ServiceInstance> result = new ArrayList<>(microServiceInstances.size());
if (RegisterContext.INSTANCE.isAvailable()
&& !RegisterDynamicConfig.INSTANCE.isNeedCloseOriginRegisterCenter()) {
result.addAll(queryOriginInstances(target, serviceId));
}
result.addAll(queryOriginInstances(target, contextResult));
for (MicroServiceInstance microServiceInstance : microServiceInstances) {
result.removeIf(originServiceInstance ->
HostUtils.isSameInstance(originServiceInstance.getHost(), originServiceInstance.getPort(),
Expand All @@ -94,21 +95,15 @@ private List<ServiceInstance> convertAndMerge(List<MicroServiceInstance> microSe
return result.stream().filter(Objects::nonNull).collect(Collectors.toList());
}

private List<ServiceInstance> queryOriginInstances(Object target, String serviceId) {
try {
if (target instanceof CompositeDiscoveryClient) {
final CompositeDiscoveryClient discoveryClient = (CompositeDiscoveryClient) target;
return discoveryClient.getInstances(serviceId);
}
if (isWebfLux(target)) {
ReactiveDiscoveryClient reactiveDiscoveryClient = (ReactiveDiscoveryClient) target;
final Flux<ServiceInstance> instances = reactiveDiscoveryClient.getInstances(serviceId);
return instances.collectList().block();
}
} catch (Exception exception) {
LOGGER.warning(String.format(Locale.ENGLISH,
"Query Instances from origin register center failed, may be it is not available! reason: %s",
exception.getMessage()));
private List<ServiceInstance> queryOriginInstances(Object target, Object contextResult) {
if (target instanceof CompositeDiscoveryClient) {
return contextResult == null ? Collections.emptyList() : (List<ServiceInstance>) contextResult;
}
if (isWebfLux(target)) {
List<ServiceInstance> resultList = new ArrayList<>();
final Flux<ServiceInstance> instances = (Flux<ServiceInstance>) contextResult;
instances.collectList().subscribe(resultList::addAll);
return resultList;
}
return Collections.emptyList();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,9 @@

import com.huaweicloud.sermant.core.plugin.agent.entity.ExecuteContext;
import com.huaweicloud.sermant.core.plugin.service.PluginServiceManager;
import com.huaweicloud.sermant.core.service.ServiceManager;

import reactor.core.publisher.Flux;

import org.springframework.cloud.client.discovery.DiscoveryClient;
import org.springframework.cloud.client.discovery.ReactiveDiscoveryClient;

import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
Expand All @@ -43,43 +39,49 @@
public class DiscoveryClientServiceInterceptor extends InstanceInterceptorSupport {
@Override
public ExecuteContext doBefore(ExecuteContext context) {
if (isMarked()) {
return context;
}
try {
mark();
if (!RegisterContext.INSTANCE.isAvailable()
|| RegisterDynamicConfig.INSTANCE.isNeedCloseOriginRegisterCenter()) {
final RegisterCenterService service = PluginServiceManager.getPluginService(RegisterCenterService.class);
final List<String> services = new ArrayList<>(service.getServices());
final Object target = context.getObject();
context.skip(isWebfLux(target) ? getServicesWithFlux(services, target) : getServices(services, target));
} finally {
unMark();
context.skip(isWebfLux(target) ? Flux.fromIterable(services) : services);
}
return context;
}

private Flux<String> getServicesWithFlux(List<String> services, Object target) {
return Flux.fromIterable(getServices(services, target));
@Override
public ExecuteContext doAfter(ExecuteContext context) {
final RegisterCenterService service = PluginServiceManager.getPluginService(RegisterCenterService.class);
final List<String> services = new ArrayList<>(service.getServices());
final Object target = context.getObject();
final Object contextResult = context.getResult();
context.changeResult(
isWebfLux(target) ? getServicesWithFlux(services, target, contextResult) : getServices(services,
target, contextResult));
return context;
}

private Flux<String> getServicesWithFlux(List<String> services, Object target, Object contextResult) {
return Flux.fromIterable(getServices(services, target, contextResult));
}

private List<String> getServices(List<String> services, Object target) {
private List<String> getServices(List<String> services, Object target, Object contextResult) {
if (!RegisterContext.INSTANCE.isAvailable()
|| RegisterDynamicConfig.INSTANCE.isNeedCloseOriginRegisterCenter()) {
return services;
}

// 合并两个注册中心
if (isWebfLux(target)) {
ReactiveDiscoveryClient reactiveDiscoveryClient = (ReactiveDiscoveryClient) target;
final Flux<String> originServicesFlux = reactiveDiscoveryClient.getServices();
final List<String> originServices = originServicesFlux.collectList().block();
if (originServices == null) {
final Flux<String> originServicesFlux = (Flux<String>) contextResult;
final List<String> originServices = new ArrayList<>();
originServicesFlux.collectList().subscribe(originServices::addAll);
if (originServices.size() == 0) {
return services;
}
services.addAll(originServices);
} else {
final DiscoveryClient discoveryClient = (DiscoveryClient) target;
services.addAll(discoveryClient.getServices());
services.addAll((List<String>) contextResult);
}
return services.stream().distinct().collect(Collectors.toList());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,40 +38,13 @@
* @since 2022-02-22
*/
public abstract class InstanceInterceptorSupport extends RegisterSwitchSupport {
private final ThreadLocal<Object> threadLocal = new ThreadLocal<>();

/**
* 类缓存, 避免多次调用loadClass
*/
private final Map<String, Class<?>> cacheClasses = new ConcurrentHashMap<>();

private RegisterConfig config;

/**
* 标记当前线程方法调用
* <p></p>
* 默认标记, 确保下游调用不会存在再次mark的场景
*/
protected final void mark() {
threadLocal.set(Boolean.TRUE);
}

/**
* 默认去除标记
*/
protected final void unMark() {
threadLocal.remove();
}

/**
* 判断是否被标记
*
* @return 是否被标记
*/
protected final boolean isMarked() {
return threadLocal.get() != null;
}

/**
* 是否开启注册中心迁移,双注册
*
Expand Down Expand Up @@ -100,7 +73,7 @@ protected final Class<?> getInstanceClass(String className) {
Class<?> result = null;
try {
result = ClassLoaderUtils.defineClass(className, contextClassLoader,
ClassLoaderUtils.getClassResource(this.getClass().getClassLoader(), className));
ClassLoaderUtils.getClassResource(this.getClass().getClassLoader(), className));
} catch (InvocationTargetException | IllegalAccessException | NoSuchMethodException | IOException e) {
// 有可能已经加载过了,直接用contextClassLoader.loadClass加载
try {
Expand All @@ -117,17 +90,17 @@ protected final Class<?> getInstanceClass(String className) {
* 构建实例 由子类自行转换
*
* @param microServiceInstance 实例信息
* @param serviceName 服务名
* @param serviceName 服务名
* @return Object
*/
protected final Optional<Object> buildInstance(MicroServiceInstance microServiceInstance, String serviceName) {
final Class<?> serverClass = getInstanceClass(getInstanceClassName());
try {
Constructor<?> declaredConstructor = serverClass
.getDeclaredConstructor(MicroServiceInstance.class, String.class);
.getDeclaredConstructor(MicroServiceInstance.class, String.class);
return Optional.of(declaredConstructor.newInstance(microServiceInstance, serviceName));
} catch (NoSuchMethodException | InstantiationException | IllegalAccessException
| InvocationTargetException ignored) {
| InvocationTargetException ignored) {
return Optional.empty();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,6 @@ public static void init() {
RegisterServiceCommonConfig.class)).thenReturn(COMMON_CONFIG);
pluginServiceManagerMockedStatic = Mockito.mockStatic(PluginServiceManager.class);
serviceManagerMockedStatic = Mockito.mockStatic(ServiceManager.class);

}

@AfterClass
Expand All @@ -79,7 +78,6 @@ public static void clear() {
}

/**
*
* @return 测试拦截器
*/
protected abstract T getInterceptor();
Expand All @@ -98,12 +96,29 @@ protected ExecuteContext buildContext() throws NoSuchMethodException {
* 构建基本的context
*
* @param arguments 参数
* @param target 对象
* @param target 对象
* @return context
* @throws NoSuchMethodException 不会抛出
*/
protected ExecuteContext buildContext(Object target, Object[] arguments) throws NoSuchMethodException {
return ExecuteContext.forMemberMethod(target, String.class.getDeclaredMethod("trim"),
arguments, null, null);
}

/**
* 构建基本的context
*
* @param arguments 参数
* @param target 对象
* @param result ExecuteContext的result
* @return context
* @throws NoSuchMethodException 不会抛出
*/
protected ExecuteContext buildContext(Object target, Object[] arguments, Object result)
throws NoSuchMethodException {
ExecuteContext context = ExecuteContext.forMemberMethod(target, String.class.getDeclaredMethod("trim"),
arguments, null, null);
context.changeResult(result);
return context;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import com.huaweicloud.sermant.core.plugin.agent.entity.ExecuteContext;
import com.huaweicloud.sermant.core.plugin.service.PluginServiceManager;
import com.huaweicloud.sermant.core.service.ServiceManager;
import com.huaweicloud.sermant.core.utils.CollectionUtils;

import org.junit.Assert;
import org.junit.Before;
Expand All @@ -36,9 +37,12 @@
import org.springframework.cloud.client.discovery.composite.CompositeDiscoveryClient;
import org.springframework.cloud.client.discovery.composite.reactive.ReactiveCompositeDiscoveryClient;
import org.springframework.cloud.zookeeper.discovery.ZookeeperServiceInstance;

import reactor.core.publisher.Flux;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;

/**
Expand All @@ -49,6 +53,7 @@
*/
public class DiscoveryClientInterceptorTest extends BaseRegistryTest<DiscoveryClientInterceptor> {
private final List<MicroServiceInstance> instanceList = new ArrayList<>();

private final List<ServiceInstance> zkInstanceList = new ArrayList<>();

private final String serviceName = "test";
Expand All @@ -73,8 +78,6 @@ public void setUp() throws Exception {
zkInstanceList.add(new ZookeeperServiceInstance(serviceName, test.buildZkInstance(8003)));
zkInstanceList.add(new ZookeeperServiceInstance(serviceName, test.buildZkInstance(8004)));
Mockito.when(registerCenterService.getServerList(serviceName)).thenReturn(instanceList);
Mockito.when(client.getInstances(serviceName)).thenReturn(zkInstanceList);
Mockito.when(reactiveDiscoveryClient.getInstances(serviceName)).thenReturn(Flux.fromIterable(zkInstanceList));
}

@Test
Expand All @@ -83,18 +86,26 @@ public void doBefore() throws NoSuchMethodException {
REGISTER_CONFIG.setEnableSpringRegister(true);
REGISTER_CONFIG.setOpenMigration(true);
RegisterDynamicConfig.INSTANCE.setClose(false);
final ExecuteContext context = interceptor.doBefore(buildContext(client, new Object[]{serviceName}));
Assert.assertTrue(context.isSkip());
Assert.assertTrue(context.getResult() instanceof List);
Assert.assertEquals(((List<?>) context.getResult()).size(), zkInstanceList.size() + instanceList.size());

final ExecuteContext fluxContext = interceptor.doBefore(buildContext(reactiveDiscoveryClient, new Object[]{serviceName}));
Assert.assertTrue(fluxContext.isSkip());
Assert.assertTrue(fluxContext.getResult() instanceof Flux);
final List<?> block = ((Flux<?>) fluxContext.getResult()).collectList().block();
final ExecuteContext context = interceptor.doBefore(
buildContext(client, new Object[]{serviceName}, zkInstanceList));
final ExecuteContext contextResult = interceptor.doAfter(context);
Assert.assertTrue(contextResult.getResult() instanceof List);
Assert.assertEquals(((List<?>) contextResult.getResult()).size(), zkInstanceList.size() + instanceList.size());

final ExecuteContext fluxContext = interceptor.doBefore(buildContext(reactiveDiscoveryClient,
new Object[]{serviceName}, Flux.fromIterable(zkInstanceList)));
final ExecuteContext fluxContextResult = interceptor.doAfter(fluxContext);
Assert.assertTrue(fluxContextResult.getResult() instanceof Flux);
final List<?> block = ((Flux<?>) fluxContextResult.getResult()).collectList().block();
Assert.assertNotNull(block);
Assert.assertEquals(block.size(), zkInstanceList.size() + instanceList.size());

RegisterContext.INSTANCE.setAvailable(false);
final ExecuteContext NotAvailableContext = interceptor.doBefore(
buildContext(client, new Object[]{serviceName}));
Assert.assertTrue(NotAvailableContext.isSkip());
Assert.assertTrue(CollectionUtils.isEmpty((Collection<?>) NotAvailableContext.getResult()));

REGISTER_CONFIG.setEnableSpringRegister(false);
REGISTER_CONFIG.setOpenMigration(false);
}
Expand Down
Loading

0 comments on commit 65ef1ab

Please sign in to comment.