Skip to content

Commit

Permalink
Merge pull request #1267 from TangLeDaily/issue1242_a
Browse files Browse the repository at this point in the history
优化springcloud双注册场景,DiscoveryClientInterceptor/DiscoveryClientServiceInterceptor处理逻辑
  • Loading branch information
Sherlockhan authored Aug 23, 2023
2 parents 4fc50af + 410a241 commit 459d747
Show file tree
Hide file tree
Showing 7 changed files with 153 additions and 125 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,23 +23,18 @@
import com.huawei.registry.support.InstanceInterceptorSupport;
import com.huawei.registry.utils.HostUtils;

import com.huaweicloud.sermant.core.common.LoggerFactory;
import com.huaweicloud.sermant.core.plugin.agent.entity.ExecuteContext;
import com.huaweicloud.sermant.core.plugin.service.PluginServiceManager;
import com.huaweicloud.sermant.core.service.ServiceManager;

import reactor.core.publisher.Flux;

import org.springframework.cloud.client.ServiceInstance;
import org.springframework.cloud.client.discovery.ReactiveDiscoveryClient;
import org.springframework.cloud.client.discovery.composite.CompositeDiscoveryClient;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Locale;
import java.util.Objects;
import java.util.logging.Logger;
import java.util.stream.Collectors;

/**
Expand All @@ -49,41 +44,56 @@
* @since 2021-12-13
*/
public class DiscoveryClientInterceptor extends InstanceInterceptorSupport {
private static final Logger LOGGER = LoggerFactory.getLogger();
private static final String SERVICE_ID = "serviceId";

private static final String MICRO_SERVICE_INSTANCES = "microServiceInstances";

@Override
public ExecuteContext doBefore(ExecuteContext context) {
if (isMarked()) {
String serviceId = (String) context.getArguments()[0];
final RegisterCenterService service = PluginServiceManager.getPluginService(RegisterCenterService.class);
final List<MicroServiceInstance> microServiceInstances = service.getServerList(serviceId);
if (microServiceInstances.isEmpty()) {
return context;
}
try {
mark();
String serviceId = (String) context.getArguments()[0];
final RegisterCenterService service = PluginServiceManager.getPluginService(RegisterCenterService.class);
final List<MicroServiceInstance> microServiceInstances = service.getServerList(serviceId);
context.setLocalFieldValue(SERVICE_ID, serviceId);
context.setLocalFieldValue(MICRO_SERVICE_INSTANCES, microServiceInstances);
if (RegisterContext.INSTANCE.isAvailable()
&& !RegisterDynamicConfig.INSTANCE.isNeedCloseOriginRegisterCenter()) {
return context;
}
final Object target = context.getObject();
context.skip(isWebfLux(target) ? Flux.fromIterable(Collections.emptyList())
: Collections.emptyList());
return context;
}

@Override
public ExecuteContext doAfter(ExecuteContext context) {
final String serviceId = (String) context.getLocalFieldValue(SERVICE_ID);
final List<MicroServiceInstance> microServiceInstances =
(List<MicroServiceInstance>) context.getLocalFieldValue(MICRO_SERVICE_INSTANCES);
if (microServiceInstances != null && !microServiceInstances.isEmpty()) {
final Object target = context.getObject();
if (!microServiceInstances.isEmpty()) {
context.skip(isWebfLux(target) ? convertAndMergeWithFlux(microServiceInstances, serviceId, target)
: convertAndMerge(microServiceInstances, serviceId, target));
}
} finally {
unMark();
final Object contextResult = context.getResult();
context.changeResult(
isWebfLux(target) ? convertAndMergeWithFlux(microServiceInstances, serviceId, target, contextResult)
: convertAndMerge(microServiceInstances, serviceId, target, contextResult));
}
return context;
}

private Flux<ServiceInstance> convertAndMergeWithFlux(List<MicroServiceInstance> microServiceInstances,
String serviceId, Object target) {
return Flux.fromIterable(convertAndMerge(microServiceInstances, serviceId, target));
private Flux<ServiceInstance> convertAndMergeWithFlux(
List<MicroServiceInstance> microServiceInstances,
String serviceId, Object target, Object contextResult) {
return Flux.fromIterable(convertAndMerge(microServiceInstances, serviceId, target, contextResult));
}

private List<ServiceInstance> convertAndMerge(List<MicroServiceInstance> microServiceInstances, String serviceId,
Object target) {
private List<ServiceInstance> convertAndMerge(List<MicroServiceInstance> microServiceInstances,
String serviceId,
Object target, Object contextResult) {
List<ServiceInstance> result = new ArrayList<>(microServiceInstances.size());
if (RegisterContext.INSTANCE.isAvailable()
&& !RegisterDynamicConfig.INSTANCE.isNeedCloseOriginRegisterCenter()) {
result.addAll(queryOriginInstances(target, serviceId));
}
result.addAll(queryOriginInstances(target, contextResult));
for (MicroServiceInstance microServiceInstance : microServiceInstances) {
result.removeIf(originServiceInstance ->
HostUtils.isSameInstance(originServiceInstance.getHost(), originServiceInstance.getPort(),
Expand All @@ -94,21 +104,15 @@ private List<ServiceInstance> convertAndMerge(List<MicroServiceInstance> microSe
return result.stream().filter(Objects::nonNull).collect(Collectors.toList());
}

private List<ServiceInstance> queryOriginInstances(Object target, String serviceId) {
try {
if (target instanceof CompositeDiscoveryClient) {
final CompositeDiscoveryClient discoveryClient = (CompositeDiscoveryClient) target;
return discoveryClient.getInstances(serviceId);
}
if (isWebfLux(target)) {
ReactiveDiscoveryClient reactiveDiscoveryClient = (ReactiveDiscoveryClient) target;
final Flux<ServiceInstance> instances = reactiveDiscoveryClient.getInstances(serviceId);
return instances.collectList().block();
}
} catch (Exception exception) {
LOGGER.warning(String.format(Locale.ENGLISH,
"Query Instances from origin register center failed, may be it is not available! reason: %s",
exception.getMessage()));
private List<ServiceInstance> queryOriginInstances(Object target, Object contextResult) {
if (target instanceof CompositeDiscoveryClient) {
return contextResult == null ? Collections.emptyList() : (List<ServiceInstance>) contextResult;
}
if (isWebfLux(target)) {
List<ServiceInstance> resultList = new ArrayList<>();
final Flux<ServiceInstance> instances = (Flux<ServiceInstance>) contextResult;
instances.collectList().subscribe(resultList::addAll);
return resultList;
}
return Collections.emptyList();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,11 @@

import com.huaweicloud.sermant.core.plugin.agent.entity.ExecuteContext;
import com.huaweicloud.sermant.core.plugin.service.PluginServiceManager;
import com.huaweicloud.sermant.core.service.ServiceManager;

import reactor.core.publisher.Flux;

import org.springframework.cloud.client.discovery.DiscoveryClient;
import org.springframework.cloud.client.discovery.ReactiveDiscoveryClient;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;

Expand All @@ -43,43 +40,43 @@
public class DiscoveryClientServiceInterceptor extends InstanceInterceptorSupport {
@Override
public ExecuteContext doBefore(ExecuteContext context) {
if (isMarked()) {
if (RegisterContext.INSTANCE.isAvailable()
&& !RegisterDynamicConfig.INSTANCE.isNeedCloseOriginRegisterCenter()) {
return context;
}
try {
mark();
final RegisterCenterService service = PluginServiceManager.getPluginService(RegisterCenterService.class);
final List<String> services = new ArrayList<>(service.getServices());
final Object target = context.getObject();
context.skip(isWebfLux(target) ? getServicesWithFlux(services, target) : getServices(services, target));
} finally {
unMark();
}
final Object target = context.getObject();
context.skip(isWebfLux(target) ? Flux.fromIterable(Collections.emptyList()) : Collections.emptyList());
return context;
}

private Flux<String> getServicesWithFlux(List<String> services, Object target) {
return Flux.fromIterable(getServices(services, target));
@Override
public ExecuteContext doAfter(ExecuteContext context) {
final RegisterCenterService service = PluginServiceManager.getPluginService(RegisterCenterService.class);
final List<String> services = new ArrayList<>(service.getServices());
final Object target = context.getObject();
final Object contextResult = context.getResult();
context.changeResult(
isWebfLux(target) ? getServicesWithFlux(services, target, contextResult) : getServices(services,
target, contextResult));
return context;
}

private List<String> getServices(List<String> services, Object target) {
if (!RegisterContext.INSTANCE.isAvailable()
|| RegisterDynamicConfig.INSTANCE.isNeedCloseOriginRegisterCenter()) {
return services;
}
private Flux<String> getServicesWithFlux(List<String> services, Object target, Object contextResult) {
return Flux.fromIterable(getServices(services, target, contextResult));
}

private List<String> getServices(List<String> services, Object target, Object contextResult) {
// 合并两个注册中心
if (isWebfLux(target)) {
ReactiveDiscoveryClient reactiveDiscoveryClient = (ReactiveDiscoveryClient) target;
final Flux<String> originServicesFlux = reactiveDiscoveryClient.getServices();
final List<String> originServices = originServicesFlux.collectList().block();
if (originServices == null) {
final Flux<String> originServicesFlux = (Flux<String>) contextResult;
final List<String> originServices = new ArrayList<>();
originServicesFlux.collectList().subscribe(originServices::addAll);
if (originServices.size() == 0) {
return services;
}
services.addAll(originServices);
} else {
final DiscoveryClient discoveryClient = (DiscoveryClient) target;
services.addAll(discoveryClient.getServices());
services.addAll((List<String>) contextResult);
}
return services.stream().distinct().collect(Collectors.toList());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@

import com.huaweicloud.sermant.core.plugin.agent.entity.ExecuteContext;
import com.huaweicloud.sermant.core.plugin.service.PluginServiceManager;
import com.huaweicloud.sermant.core.service.ServiceManager;
import com.huaweicloud.sermant.core.utils.ReflectUtils;

import com.netflix.loadbalancer.DynamicServerListLoadBalancer;
Expand Down Expand Up @@ -69,8 +68,8 @@ private List<Server> convertAndMerge(DynamicServerListLoadBalancer<Server> serve
}
for (MicroServiceInstance microServiceInstance : microServiceInstances) {
result.removeIf(originServiceInstance ->
HostUtils.isSameInstance(originServiceInstance.getHost(), originServiceInstance.getPort(),
microServiceInstance.getHost(), microServiceInstance.getPort()));
HostUtils.isSameInstance(originServiceInstance.getHost(), originServiceInstance.getPort(),
microServiceInstance.getHost(), microServiceInstance.getPort()));
result.add(new ScServer(microServiceInstance, serverListLoadBalancer.getName()));
}
return result;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,40 +38,13 @@
* @since 2022-02-22
*/
public abstract class InstanceInterceptorSupport extends RegisterSwitchSupport {
private final ThreadLocal<Object> threadLocal = new ThreadLocal<>();

/**
* 类缓存, 避免多次调用loadClass
*/
private final Map<String, Class<?>> cacheClasses = new ConcurrentHashMap<>();

private RegisterConfig config;

/**
* 标记当前线程方法调用
* <p></p>
* 默认标记, 确保下游调用不会存在再次mark的场景
*/
protected final void mark() {
threadLocal.set(Boolean.TRUE);
}

/**
* 默认去除标记
*/
protected final void unMark() {
threadLocal.remove();
}

/**
* 判断是否被标记
*
* @return 是否被标记
*/
protected final boolean isMarked() {
return threadLocal.get() != null;
}

/**
* 是否开启注册中心迁移,双注册
*
Expand Down Expand Up @@ -100,7 +73,7 @@ protected final Class<?> getInstanceClass(String className) {
Class<?> result = null;
try {
result = ClassLoaderUtils.defineClass(className, contextClassLoader,
ClassLoaderUtils.getClassResource(this.getClass().getClassLoader(), className));
ClassLoaderUtils.getClassResource(this.getClass().getClassLoader(), className));
} catch (InvocationTargetException | IllegalAccessException | NoSuchMethodException | IOException e) {
// 有可能已经加载过了,直接用contextClassLoader.loadClass加载
try {
Expand All @@ -117,17 +90,17 @@ protected final Class<?> getInstanceClass(String className) {
* 构建实例 由子类自行转换
*
* @param microServiceInstance 实例信息
* @param serviceName 服务名
* @param serviceName 服务名
* @return Object
*/
protected final Optional<Object> buildInstance(MicroServiceInstance microServiceInstance, String serviceName) {
final Class<?> serverClass = getInstanceClass(getInstanceClassName());
try {
Constructor<?> declaredConstructor = serverClass
.getDeclaredConstructor(MicroServiceInstance.class, String.class);
.getDeclaredConstructor(MicroServiceInstance.class, String.class);
return Optional.of(declaredConstructor.newInstance(microServiceInstance, serviceName));
} catch (NoSuchMethodException | InstantiationException | IllegalAccessException
| InvocationTargetException ignored) {
| InvocationTargetException ignored) {
return Optional.empty();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,6 @@ public static void init() {
RegisterServiceCommonConfig.class)).thenReturn(COMMON_CONFIG);
pluginServiceManagerMockedStatic = Mockito.mockStatic(PluginServiceManager.class);
serviceManagerMockedStatic = Mockito.mockStatic(ServiceManager.class);

}

@AfterClass
Expand All @@ -79,7 +78,6 @@ public static void clear() {
}

/**
*
* @return 测试拦截器
*/
protected abstract T getInterceptor();
Expand All @@ -98,12 +96,29 @@ protected ExecuteContext buildContext() throws NoSuchMethodException {
* 构建基本的context
*
* @param arguments 参数
* @param target 对象
* @param target 对象
* @return context
* @throws NoSuchMethodException 不会抛出
*/
protected ExecuteContext buildContext(Object target, Object[] arguments) throws NoSuchMethodException {
return ExecuteContext.forMemberMethod(target, String.class.getDeclaredMethod("trim"),
arguments, null, null);
}

/**
* 构建基本的context
*
* @param arguments 参数
* @param target 对象
* @param result ExecuteContext的result
* @return context
* @throws NoSuchMethodException 不会抛出
*/
protected ExecuteContext buildContext(Object target, Object[] arguments, Object result)
throws NoSuchMethodException {
ExecuteContext context = ExecuteContext.forMemberMethod(target, String.class.getDeclaredMethod("trim"),
arguments, null, null);
context.changeResult(result);
return context;
}
}
Loading

0 comments on commit 459d747

Please sign in to comment.