Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

优化springcloud双注册场景,DiscoveryClientInterceptor/DiscoveryClientServiceInterceptor处理逻辑 #1267

Merged
merged 1 commit into from
Aug 23, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,23 +23,18 @@
import com.huawei.registry.support.InstanceInterceptorSupport;
import com.huawei.registry.utils.HostUtils;

import com.huaweicloud.sermant.core.common.LoggerFactory;
import com.huaweicloud.sermant.core.plugin.agent.entity.ExecuteContext;
import com.huaweicloud.sermant.core.plugin.service.PluginServiceManager;
import com.huaweicloud.sermant.core.service.ServiceManager;

import reactor.core.publisher.Flux;

import org.springframework.cloud.client.ServiceInstance;
import org.springframework.cloud.client.discovery.ReactiveDiscoveryClient;
import org.springframework.cloud.client.discovery.composite.CompositeDiscoveryClient;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Locale;
import java.util.Objects;
import java.util.logging.Logger;
import java.util.stream.Collectors;

/**
Expand All @@ -49,41 +44,56 @@
* @since 2021-12-13
*/
public class DiscoveryClientInterceptor extends InstanceInterceptorSupport {
private static final Logger LOGGER = LoggerFactory.getLogger();
private static final String SERVICE_ID = "serviceId";

private static final String MICRO_SERVICE_INSTANCES = "microServiceInstances";

@Override
public ExecuteContext doBefore(ExecuteContext context) {
if (isMarked()) {
String serviceId = (String) context.getArguments()[0];
final RegisterCenterService service = PluginServiceManager.getPluginService(RegisterCenterService.class);
final List<MicroServiceInstance> microServiceInstances = service.getServerList(serviceId);
if (microServiceInstances.isEmpty()) {
provenceee marked this conversation as resolved.
Show resolved Hide resolved
return context;
}
try {
mark();
String serviceId = (String) context.getArguments()[0];
final RegisterCenterService service = PluginServiceManager.getPluginService(RegisterCenterService.class);
final List<MicroServiceInstance> microServiceInstances = service.getServerList(serviceId);
context.setLocalFieldValue(SERVICE_ID, serviceId);
context.setLocalFieldValue(MICRO_SERVICE_INSTANCES, microServiceInstances);
if (RegisterContext.INSTANCE.isAvailable()
&& !RegisterDynamicConfig.INSTANCE.isNeedCloseOriginRegisterCenter()) {
hanbingleixue marked this conversation as resolved.
Show resolved Hide resolved
return context;
}
final Object target = context.getObject();
context.skip(isWebfLux(target) ? Flux.fromIterable(Collections.emptyList())
: Collections.emptyList());
return context;
}

@Override
public ExecuteContext doAfter(ExecuteContext context) {
final String serviceId = (String) context.getLocalFieldValue(SERVICE_ID);
final List<MicroServiceInstance> microServiceInstances =
(List<MicroServiceInstance>) context.getLocalFieldValue(MICRO_SERVICE_INSTANCES);
if (microServiceInstances != null && !microServiceInstances.isEmpty()) {
final Object target = context.getObject();
if (!microServiceInstances.isEmpty()) {
context.skip(isWebfLux(target) ? convertAndMergeWithFlux(microServiceInstances, serviceId, target)
: convertAndMerge(microServiceInstances, serviceId, target));
}
} finally {
unMark();
final Object contextResult = context.getResult();
context.changeResult(
isWebfLux(target) ? convertAndMergeWithFlux(microServiceInstances, serviceId, target, contextResult)
: convertAndMerge(microServiceInstances, serviceId, target, contextResult));
hanbingleixue marked this conversation as resolved.
Show resolved Hide resolved
}
return context;
}

private Flux<ServiceInstance> convertAndMergeWithFlux(List<MicroServiceInstance> microServiceInstances,
String serviceId, Object target) {
return Flux.fromIterable(convertAndMerge(microServiceInstances, serviceId, target));
private Flux<ServiceInstance> convertAndMergeWithFlux(
List<MicroServiceInstance> microServiceInstances,
String serviceId, Object target, Object contextResult) {
return Flux.fromIterable(convertAndMerge(microServiceInstances, serviceId, target, contextResult));
}

private List<ServiceInstance> convertAndMerge(List<MicroServiceInstance> microServiceInstances, String serviceId,
Object target) {
private List<ServiceInstance> convertAndMerge(List<MicroServiceInstance> microServiceInstances,
String serviceId,
Object target, Object contextResult) {
List<ServiceInstance> result = new ArrayList<>(microServiceInstances.size());
if (RegisterContext.INSTANCE.isAvailable()
&& !RegisterDynamicConfig.INSTANCE.isNeedCloseOriginRegisterCenter()) {
result.addAll(queryOriginInstances(target, serviceId));
}
result.addAll(queryOriginInstances(target, contextResult));
for (MicroServiceInstance microServiceInstance : microServiceInstances) {
result.removeIf(originServiceInstance ->
HostUtils.isSameInstance(originServiceInstance.getHost(), originServiceInstance.getPort(),
Expand All @@ -94,21 +104,15 @@ private List<ServiceInstance> convertAndMerge(List<MicroServiceInstance> microSe
return result.stream().filter(Objects::nonNull).collect(Collectors.toList());
}

private List<ServiceInstance> queryOriginInstances(Object target, String serviceId) {
try {
if (target instanceof CompositeDiscoveryClient) {
final CompositeDiscoveryClient discoveryClient = (CompositeDiscoveryClient) target;
return discoveryClient.getInstances(serviceId);
}
if (isWebfLux(target)) {
ReactiveDiscoveryClient reactiveDiscoveryClient = (ReactiveDiscoveryClient) target;
final Flux<ServiceInstance> instances = reactiveDiscoveryClient.getInstances(serviceId);
return instances.collectList().block();
}
} catch (Exception exception) {
LOGGER.warning(String.format(Locale.ENGLISH,
"Query Instances from origin register center failed, may be it is not available! reason: %s",
exception.getMessage()));
private List<ServiceInstance> queryOriginInstances(Object target, Object contextResult) {
if (target instanceof CompositeDiscoveryClient) {
return contextResult == null ? Collections.emptyList() : (List<ServiceInstance>) contextResult;
}
if (isWebfLux(target)) {
List<ServiceInstance> resultList = new ArrayList<>();
final Flux<ServiceInstance> instances = (Flux<ServiceInstance>) contextResult;
instances.collectList().subscribe(resultList::addAll);
return resultList;
}
return Collections.emptyList();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,11 @@

import com.huaweicloud.sermant.core.plugin.agent.entity.ExecuteContext;
import com.huaweicloud.sermant.core.plugin.service.PluginServiceManager;
import com.huaweicloud.sermant.core.service.ServiceManager;

import reactor.core.publisher.Flux;

import org.springframework.cloud.client.discovery.DiscoveryClient;
import org.springframework.cloud.client.discovery.ReactiveDiscoveryClient;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;

Expand All @@ -43,43 +40,43 @@
public class DiscoveryClientServiceInterceptor extends InstanceInterceptorSupport {
@Override
public ExecuteContext doBefore(ExecuteContext context) {
if (isMarked()) {
if (RegisterContext.INSTANCE.isAvailable()
&& !RegisterDynamicConfig.INSTANCE.isNeedCloseOriginRegisterCenter()) {
return context;
}
try {
mark();
final RegisterCenterService service = PluginServiceManager.getPluginService(RegisterCenterService.class);
final List<String> services = new ArrayList<>(service.getServices());
final Object target = context.getObject();
context.skip(isWebfLux(target) ? getServicesWithFlux(services, target) : getServices(services, target));
} finally {
unMark();
}
final Object target = context.getObject();
context.skip(isWebfLux(target) ? Flux.fromIterable(Collections.emptyList()) : Collections.emptyList());
return context;
}

private Flux<String> getServicesWithFlux(List<String> services, Object target) {
return Flux.fromIterable(getServices(services, target));
@Override
public ExecuteContext doAfter(ExecuteContext context) {
final RegisterCenterService service = PluginServiceManager.getPluginService(RegisterCenterService.class);
final List<String> services = new ArrayList<>(service.getServices());
final Object target = context.getObject();
final Object contextResult = context.getResult();
context.changeResult(
isWebfLux(target) ? getServicesWithFlux(services, target, contextResult) : getServices(services,
target, contextResult));
return context;
}

private List<String> getServices(List<String> services, Object target) {
if (!RegisterContext.INSTANCE.isAvailable()
|| RegisterDynamicConfig.INSTANCE.isNeedCloseOriginRegisterCenter()) {
return services;
}
private Flux<String> getServicesWithFlux(List<String> services, Object target, Object contextResult) {
return Flux.fromIterable(getServices(services, target, contextResult));
}

private List<String> getServices(List<String> services, Object target, Object contextResult) {
// 合并两个注册中心
if (isWebfLux(target)) {
ReactiveDiscoveryClient reactiveDiscoveryClient = (ReactiveDiscoveryClient) target;
final Flux<String> originServicesFlux = reactiveDiscoveryClient.getServices();
final List<String> originServices = originServicesFlux.collectList().block();
if (originServices == null) {
final Flux<String> originServicesFlux = (Flux<String>) contextResult;
final List<String> originServices = new ArrayList<>();
originServicesFlux.collectList().subscribe(originServices::addAll);
if (originServices.size() == 0) {
return services;
}
services.addAll(originServices);
} else {
final DiscoveryClient discoveryClient = (DiscoveryClient) target;
services.addAll(discoveryClient.getServices());
services.addAll((List<String>) contextResult);
}
return services.stream().distinct().collect(Collectors.toList());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@

import com.huaweicloud.sermant.core.plugin.agent.entity.ExecuteContext;
import com.huaweicloud.sermant.core.plugin.service.PluginServiceManager;
import com.huaweicloud.sermant.core.service.ServiceManager;
import com.huaweicloud.sermant.core.utils.ReflectUtils;

import com.netflix.loadbalancer.DynamicServerListLoadBalancer;
Expand Down Expand Up @@ -69,8 +68,8 @@ private List<Server> convertAndMerge(DynamicServerListLoadBalancer<Server> serve
}
for (MicroServiceInstance microServiceInstance : microServiceInstances) {
result.removeIf(originServiceInstance ->
HostUtils.isSameInstance(originServiceInstance.getHost(), originServiceInstance.getPort(),
microServiceInstance.getHost(), microServiceInstance.getPort()));
HostUtils.isSameInstance(originServiceInstance.getHost(), originServiceInstance.getPort(),
microServiceInstance.getHost(), microServiceInstance.getPort()));
result.add(new ScServer(microServiceInstance, serverListLoadBalancer.getName()));
}
return result;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,40 +38,13 @@
* @since 2022-02-22
*/
public abstract class InstanceInterceptorSupport extends RegisterSwitchSupport {
private final ThreadLocal<Object> threadLocal = new ThreadLocal<>();

/**
* 类缓存, 避免多次调用loadClass
*/
private final Map<String, Class<?>> cacheClasses = new ConcurrentHashMap<>();

private RegisterConfig config;

/**
* 标记当前线程方法调用
* <p></p>
* 默认标记, 确保下游调用不会存在再次mark的场景
*/
protected final void mark() {
threadLocal.set(Boolean.TRUE);
}

/**
* 默认去除标记
*/
protected final void unMark() {
threadLocal.remove();
}

/**
* 判断是否被标记
*
* @return 是否被标记
*/
protected final boolean isMarked() {
return threadLocal.get() != null;
}

/**
* 是否开启注册中心迁移,双注册
*
Expand Down Expand Up @@ -100,7 +73,7 @@ protected final Class<?> getInstanceClass(String className) {
Class<?> result = null;
try {
result = ClassLoaderUtils.defineClass(className, contextClassLoader,
ClassLoaderUtils.getClassResource(this.getClass().getClassLoader(), className));
ClassLoaderUtils.getClassResource(this.getClass().getClassLoader(), className));
} catch (InvocationTargetException | IllegalAccessException | NoSuchMethodException | IOException e) {
// 有可能已经加载过了,直接用contextClassLoader.loadClass加载
try {
Expand All @@ -117,17 +90,17 @@ protected final Class<?> getInstanceClass(String className) {
* 构建实例 由子类自行转换
*
* @param microServiceInstance 实例信息
* @param serviceName 服务名
* @param serviceName 服务名
* @return Object
*/
protected final Optional<Object> buildInstance(MicroServiceInstance microServiceInstance, String serviceName) {
final Class<?> serverClass = getInstanceClass(getInstanceClassName());
try {
Constructor<?> declaredConstructor = serverClass
.getDeclaredConstructor(MicroServiceInstance.class, String.class);
.getDeclaredConstructor(MicroServiceInstance.class, String.class);
return Optional.of(declaredConstructor.newInstance(microServiceInstance, serviceName));
} catch (NoSuchMethodException | InstantiationException | IllegalAccessException
| InvocationTargetException ignored) {
| InvocationTargetException ignored) {
return Optional.empty();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,6 @@ public static void init() {
RegisterServiceCommonConfig.class)).thenReturn(COMMON_CONFIG);
pluginServiceManagerMockedStatic = Mockito.mockStatic(PluginServiceManager.class);
serviceManagerMockedStatic = Mockito.mockStatic(ServiceManager.class);

}

@AfterClass
Expand All @@ -79,7 +78,6 @@ public static void clear() {
}

/**
*
* @return 测试拦截器
*/
protected abstract T getInterceptor();
Expand All @@ -98,12 +96,29 @@ protected ExecuteContext buildContext() throws NoSuchMethodException {
* 构建基本的context
*
* @param arguments 参数
* @param target 对象
* @param target 对象
* @return context
* @throws NoSuchMethodException 不会抛出
*/
protected ExecuteContext buildContext(Object target, Object[] arguments) throws NoSuchMethodException {
return ExecuteContext.forMemberMethod(target, String.class.getDeclaredMethod("trim"),
arguments, null, null);
}

/**
* 构建基本的context
*
* @param arguments 参数
* @param target 对象
* @param result ExecuteContext的result
* @return context
* @throws NoSuchMethodException 不会抛出
*/
protected ExecuteContext buildContext(Object target, Object[] arguments, Object result)
throws NoSuchMethodException {
ExecuteContext context = ExecuteContext.forMemberMethod(target, String.class.getDeclaredMethod("trim"),
arguments, null, null);
context.changeResult(result);
return context;
}
}
Loading