diff --git a/sermant-plugins/sermant-service-registry/spring-cloud-registry-plugin/src/main/java/com/huawei/registry/interceptors/DiscoveryClientInterceptor.java b/sermant-plugins/sermant-service-registry/spring-cloud-registry-plugin/src/main/java/com/huawei/registry/interceptors/DiscoveryClientInterceptor.java index d67ce75b82..15b660ddec 100644 --- a/sermant-plugins/sermant-service-registry/spring-cloud-registry-plugin/src/main/java/com/huawei/registry/interceptors/DiscoveryClientInterceptor.java +++ b/sermant-plugins/sermant-service-registry/spring-cloud-registry-plugin/src/main/java/com/huawei/registry/interceptors/DiscoveryClientInterceptor.java @@ -26,18 +26,15 @@ import com.huaweicloud.sermant.core.common.LoggerFactory; import com.huaweicloud.sermant.core.plugin.agent.entity.ExecuteContext; import com.huaweicloud.sermant.core.plugin.service.PluginServiceManager; -import com.huaweicloud.sermant.core.service.ServiceManager; import reactor.core.publisher.Flux; import org.springframework.cloud.client.ServiceInstance; -import org.springframework.cloud.client.discovery.ReactiveDiscoveryClient; import org.springframework.cloud.client.discovery.composite.CompositeDiscoveryClient; import java.util.ArrayList; import java.util.Collections; import java.util.List; -import java.util.Locale; import java.util.Objects; import java.util.logging.Logger; import java.util.stream.Collectors; @@ -53,37 +50,41 @@ public class DiscoveryClientInterceptor extends InstanceInterceptorSupport { @Override public ExecuteContext doBefore(ExecuteContext context) { - if (isMarked()) { - return context; - } - try { - mark(); - String serviceId = (String) context.getArguments()[0]; - final RegisterCenterService service = PluginServiceManager.getPluginService(RegisterCenterService.class); - final List microServiceInstances = service.getServerList(serviceId); + if (!RegisterContext.INSTANCE.isAvailable() + || RegisterDynamicConfig.INSTANCE.isNeedCloseOriginRegisterCenter()) { final Object target = context.getObject(); - if (!microServiceInstances.isEmpty()) { - context.skip(isWebfLux(target) ? convertAndMergeWithFlux(microServiceInstances, serviceId, target) - : convertAndMerge(microServiceInstances, serviceId, target)); - } - } finally { - unMark(); + context.skip(isWebfLux(target) ? Flux.fromIterable(Collections.emptyList()) + : Collections.emptyList()); + } + return context; + } + + @Override + public ExecuteContext doAfter(ExecuteContext context) { + String serviceId = (String) context.getArguments()[0]; + final RegisterCenterService service = PluginServiceManager.getPluginService(RegisterCenterService.class); + final List microServiceInstances = service.getServerList(serviceId); + final Object target = context.getObject(); + final Object contextResult = context.getResult(); + if (!microServiceInstances.isEmpty()) { + context.changeResult( + isWebfLux(target) ? convertAndMergeWithFlux(microServiceInstances, serviceId, target, contextResult) + : convertAndMerge(microServiceInstances, serviceId, target, contextResult)); } return context; } - private Flux convertAndMergeWithFlux(List microServiceInstances, - String serviceId, Object target) { - return Flux.fromIterable(convertAndMerge(microServiceInstances, serviceId, target)); + private Flux convertAndMergeWithFlux( + List microServiceInstances, + String serviceId, Object target, Object contextResult) { + return Flux.fromIterable(convertAndMerge(microServiceInstances, serviceId, target, contextResult)); } - private List convertAndMerge(List microServiceInstances, String serviceId, - Object target) { + private List convertAndMerge(List microServiceInstances, + String serviceId, + Object target, Object contextResult) { List result = new ArrayList<>(microServiceInstances.size()); - if (RegisterContext.INSTANCE.isAvailable() - && !RegisterDynamicConfig.INSTANCE.isNeedCloseOriginRegisterCenter()) { - result.addAll(queryOriginInstances(target, serviceId)); - } + result.addAll(queryOriginInstances(target, contextResult)); for (MicroServiceInstance microServiceInstance : microServiceInstances) { result.removeIf(originServiceInstance -> HostUtils.isSameInstance(originServiceInstance.getHost(), originServiceInstance.getPort(), @@ -94,21 +95,15 @@ private List convertAndMerge(List microSe return result.stream().filter(Objects::nonNull).collect(Collectors.toList()); } - private List queryOriginInstances(Object target, String serviceId) { - try { - if (target instanceof CompositeDiscoveryClient) { - final CompositeDiscoveryClient discoveryClient = (CompositeDiscoveryClient) target; - return discoveryClient.getInstances(serviceId); - } - if (isWebfLux(target)) { - ReactiveDiscoveryClient reactiveDiscoveryClient = (ReactiveDiscoveryClient) target; - final Flux instances = reactiveDiscoveryClient.getInstances(serviceId); - return instances.collectList().block(); - } - } catch (Exception exception) { - LOGGER.warning(String.format(Locale.ENGLISH, - "Query Instances from origin register center failed, may be it is not available! reason: %s", - exception.getMessage())); + private List queryOriginInstances(Object target, Object contextResult) { + if (target instanceof CompositeDiscoveryClient) { + return contextResult == null ? Collections.emptyList() : (List) contextResult; + } + if (isWebfLux(target)) { + List resultList = new ArrayList<>(); + final Flux instances = (Flux) contextResult; + instances.collectList().subscribe(resultList::addAll); + return resultList; } return Collections.emptyList(); } diff --git a/sermant-plugins/sermant-service-registry/spring-cloud-registry-plugin/src/main/java/com/huawei/registry/interceptors/DiscoveryClientServiceInterceptor.java b/sermant-plugins/sermant-service-registry/spring-cloud-registry-plugin/src/main/java/com/huawei/registry/interceptors/DiscoveryClientServiceInterceptor.java index cce454700b..e8c2a2191b 100644 --- a/sermant-plugins/sermant-service-registry/spring-cloud-registry-plugin/src/main/java/com/huawei/registry/interceptors/DiscoveryClientServiceInterceptor.java +++ b/sermant-plugins/sermant-service-registry/spring-cloud-registry-plugin/src/main/java/com/huawei/registry/interceptors/DiscoveryClientServiceInterceptor.java @@ -23,13 +23,9 @@ import com.huaweicloud.sermant.core.plugin.agent.entity.ExecuteContext; import com.huaweicloud.sermant.core.plugin.service.PluginServiceManager; -import com.huaweicloud.sermant.core.service.ServiceManager; import reactor.core.publisher.Flux; -import org.springframework.cloud.client.discovery.DiscoveryClient; -import org.springframework.cloud.client.discovery.ReactiveDiscoveryClient; - import java.util.ArrayList; import java.util.List; import java.util.stream.Collectors; @@ -43,26 +39,33 @@ public class DiscoveryClientServiceInterceptor extends InstanceInterceptorSupport { @Override public ExecuteContext doBefore(ExecuteContext context) { - if (isMarked()) { - return context; - } - try { - mark(); + if (!RegisterContext.INSTANCE.isAvailable() + || RegisterDynamicConfig.INSTANCE.isNeedCloseOriginRegisterCenter()) { final RegisterCenterService service = PluginServiceManager.getPluginService(RegisterCenterService.class); final List services = new ArrayList<>(service.getServices()); final Object target = context.getObject(); - context.skip(isWebfLux(target) ? getServicesWithFlux(services, target) : getServices(services, target)); - } finally { - unMark(); + context.skip(isWebfLux(target) ? Flux.fromIterable(services) : services); } return context; } - private Flux getServicesWithFlux(List services, Object target) { - return Flux.fromIterable(getServices(services, target)); + @Override + public ExecuteContext doAfter(ExecuteContext context) { + final RegisterCenterService service = PluginServiceManager.getPluginService(RegisterCenterService.class); + final List services = new ArrayList<>(service.getServices()); + final Object target = context.getObject(); + final Object contextResult = context.getResult(); + context.changeResult( + isWebfLux(target) ? getServicesWithFlux(services, target, contextResult) : getServices(services, + target, contextResult)); + return context; + } + + private Flux getServicesWithFlux(List services, Object target, Object contextResult) { + return Flux.fromIterable(getServices(services, target, contextResult)); } - private List getServices(List services, Object target) { + private List getServices(List services, Object target, Object contextResult) { if (!RegisterContext.INSTANCE.isAvailable() || RegisterDynamicConfig.INSTANCE.isNeedCloseOriginRegisterCenter()) { return services; @@ -70,16 +73,15 @@ private List getServices(List services, Object target) { // 合并两个注册中心 if (isWebfLux(target)) { - ReactiveDiscoveryClient reactiveDiscoveryClient = (ReactiveDiscoveryClient) target; - final Flux originServicesFlux = reactiveDiscoveryClient.getServices(); - final List originServices = originServicesFlux.collectList().block(); - if (originServices == null) { + final Flux originServicesFlux = (Flux) contextResult; + final List originServices = new ArrayList<>(); + originServicesFlux.collectList().subscribe(originServices::addAll); + if (originServices.size() == 0) { return services; } services.addAll(originServices); } else { - final DiscoveryClient discoveryClient = (DiscoveryClient) target; - services.addAll(discoveryClient.getServices()); + services.addAll((List) contextResult); } return services.stream().distinct().collect(Collectors.toList()); } diff --git a/sermant-plugins/sermant-service-registry/spring-cloud-registry-plugin/src/main/java/com/huawei/registry/support/InstanceInterceptorSupport.java b/sermant-plugins/sermant-service-registry/spring-cloud-registry-plugin/src/main/java/com/huawei/registry/support/InstanceInterceptorSupport.java index ecf78cad4d..57fb393dfa 100644 --- a/sermant-plugins/sermant-service-registry/spring-cloud-registry-plugin/src/main/java/com/huawei/registry/support/InstanceInterceptorSupport.java +++ b/sermant-plugins/sermant-service-registry/spring-cloud-registry-plugin/src/main/java/com/huawei/registry/support/InstanceInterceptorSupport.java @@ -38,8 +38,6 @@ * @since 2022-02-22 */ public abstract class InstanceInterceptorSupport extends RegisterSwitchSupport { - private final ThreadLocal threadLocal = new ThreadLocal<>(); - /** * 类缓存, 避免多次调用loadClass */ @@ -47,31 +45,6 @@ public abstract class InstanceInterceptorSupport extends RegisterSwitchSupport { private RegisterConfig config; - /** - * 标记当前线程方法调用 - *

- * 默认标记, 确保下游调用不会存在再次mark的场景 - */ - protected final void mark() { - threadLocal.set(Boolean.TRUE); - } - - /** - * 默认去除标记 - */ - protected final void unMark() { - threadLocal.remove(); - } - - /** - * 判断是否被标记 - * - * @return 是否被标记 - */ - protected final boolean isMarked() { - return threadLocal.get() != null; - } - /** * 是否开启注册中心迁移,双注册 * @@ -100,7 +73,7 @@ protected final Class getInstanceClass(String className) { Class result = null; try { result = ClassLoaderUtils.defineClass(className, contextClassLoader, - ClassLoaderUtils.getClassResource(this.getClass().getClassLoader(), className)); + ClassLoaderUtils.getClassResource(this.getClass().getClassLoader(), className)); } catch (InvocationTargetException | IllegalAccessException | NoSuchMethodException | IOException e) { // 有可能已经加载过了,直接用contextClassLoader.loadClass加载 try { @@ -117,17 +90,17 @@ protected final Class getInstanceClass(String className) { * 构建实例 由子类自行转换 * * @param microServiceInstance 实例信息 - * @param serviceName 服务名 + * @param serviceName 服务名 * @return Object */ protected final Optional buildInstance(MicroServiceInstance microServiceInstance, String serviceName) { final Class serverClass = getInstanceClass(getInstanceClassName()); try { Constructor declaredConstructor = serverClass - .getDeclaredConstructor(MicroServiceInstance.class, String.class); + .getDeclaredConstructor(MicroServiceInstance.class, String.class); return Optional.of(declaredConstructor.newInstance(microServiceInstance, serviceName)); } catch (NoSuchMethodException | InstantiationException | IllegalAccessException - | InvocationTargetException ignored) { + | InvocationTargetException ignored) { return Optional.empty(); } } diff --git a/sermant-plugins/sermant-service-registry/spring-cloud-registry-plugin/src/test/java/com/huawei/registry/interceptors/BaseRegistryTest.java b/sermant-plugins/sermant-service-registry/spring-cloud-registry-plugin/src/test/java/com/huawei/registry/interceptors/BaseRegistryTest.java index db2b4bb0c6..5b7aff7728 100644 --- a/sermant-plugins/sermant-service-registry/spring-cloud-registry-plugin/src/test/java/com/huawei/registry/interceptors/BaseRegistryTest.java +++ b/sermant-plugins/sermant-service-registry/spring-cloud-registry-plugin/src/test/java/com/huawei/registry/interceptors/BaseRegistryTest.java @@ -64,7 +64,6 @@ public static void init() { RegisterServiceCommonConfig.class)).thenReturn(COMMON_CONFIG); pluginServiceManagerMockedStatic = Mockito.mockStatic(PluginServiceManager.class); serviceManagerMockedStatic = Mockito.mockStatic(ServiceManager.class); - } @AfterClass @@ -79,7 +78,6 @@ public static void clear() { } /** - * * @return 测试拦截器 */ protected abstract T getInterceptor(); @@ -98,7 +96,7 @@ protected ExecuteContext buildContext() throws NoSuchMethodException { * 构建基本的context * * @param arguments 参数 - * @param target 对象 + * @param target 对象 * @return context * @throws NoSuchMethodException 不会抛出 */ @@ -106,4 +104,21 @@ protected ExecuteContext buildContext(Object target, Object[] arguments) throws return ExecuteContext.forMemberMethod(target, String.class.getDeclaredMethod("trim"), arguments, null, null); } + + /** + * 构建基本的context + * + * @param arguments 参数 + * @param target 对象 + * @param result ExecuteContext的result + * @return context + * @throws NoSuchMethodException 不会抛出 + */ + protected ExecuteContext buildContext(Object target, Object[] arguments, Object result) + throws NoSuchMethodException { + ExecuteContext context = ExecuteContext.forMemberMethod(target, String.class.getDeclaredMethod("trim"), + arguments, null, null); + context.changeResult(result); + return context; + } } diff --git a/sermant-plugins/sermant-service-registry/spring-cloud-registry-plugin/src/test/java/com/huawei/registry/interceptors/DiscoveryClientInterceptorTest.java b/sermant-plugins/sermant-service-registry/spring-cloud-registry-plugin/src/test/java/com/huawei/registry/interceptors/DiscoveryClientInterceptorTest.java index 988a8b42cd..3cb41666d4 100644 --- a/sermant-plugins/sermant-service-registry/spring-cloud-registry-plugin/src/test/java/com/huawei/registry/interceptors/DiscoveryClientInterceptorTest.java +++ b/sermant-plugins/sermant-service-registry/spring-cloud-registry-plugin/src/test/java/com/huawei/registry/interceptors/DiscoveryClientInterceptorTest.java @@ -25,6 +25,7 @@ import com.huaweicloud.sermant.core.plugin.agent.entity.ExecuteContext; import com.huaweicloud.sermant.core.plugin.service.PluginServiceManager; import com.huaweicloud.sermant.core.service.ServiceManager; +import com.huaweicloud.sermant.core.utils.CollectionUtils; import org.junit.Assert; import org.junit.Before; @@ -36,9 +37,12 @@ import org.springframework.cloud.client.discovery.composite.CompositeDiscoveryClient; import org.springframework.cloud.client.discovery.composite.reactive.ReactiveCompositeDiscoveryClient; import org.springframework.cloud.zookeeper.discovery.ZookeeperServiceInstance; + import reactor.core.publisher.Flux; import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; import java.util.List; /** @@ -49,6 +53,7 @@ */ public class DiscoveryClientInterceptorTest extends BaseRegistryTest { private final List instanceList = new ArrayList<>(); + private final List zkInstanceList = new ArrayList<>(); private final String serviceName = "test"; @@ -73,8 +78,6 @@ public void setUp() throws Exception { zkInstanceList.add(new ZookeeperServiceInstance(serviceName, test.buildZkInstance(8003))); zkInstanceList.add(new ZookeeperServiceInstance(serviceName, test.buildZkInstance(8004))); Mockito.when(registerCenterService.getServerList(serviceName)).thenReturn(instanceList); - Mockito.when(client.getInstances(serviceName)).thenReturn(zkInstanceList); - Mockito.when(reactiveDiscoveryClient.getInstances(serviceName)).thenReturn(Flux.fromIterable(zkInstanceList)); } @Test @@ -83,18 +86,26 @@ public void doBefore() throws NoSuchMethodException { REGISTER_CONFIG.setEnableSpringRegister(true); REGISTER_CONFIG.setOpenMigration(true); RegisterDynamicConfig.INSTANCE.setClose(false); - final ExecuteContext context = interceptor.doBefore(buildContext(client, new Object[]{serviceName})); - Assert.assertTrue(context.isSkip()); - Assert.assertTrue(context.getResult() instanceof List); - Assert.assertEquals(((List) context.getResult()).size(), zkInstanceList.size() + instanceList.size()); - - final ExecuteContext fluxContext = interceptor.doBefore(buildContext(reactiveDiscoveryClient, new Object[]{serviceName})); - Assert.assertTrue(fluxContext.isSkip()); - Assert.assertTrue(fluxContext.getResult() instanceof Flux); - final List block = ((Flux) fluxContext.getResult()).collectList().block(); + final ExecuteContext context = interceptor.doBefore( + buildContext(client, new Object[]{serviceName}, zkInstanceList)); + final ExecuteContext contextResult = interceptor.doAfter(context); + Assert.assertTrue(contextResult.getResult() instanceof List); + Assert.assertEquals(((List) contextResult.getResult()).size(), zkInstanceList.size() + instanceList.size()); + + final ExecuteContext fluxContext = interceptor.doBefore(buildContext(reactiveDiscoveryClient, + new Object[]{serviceName}, Flux.fromIterable(zkInstanceList))); + final ExecuteContext fluxContextResult = interceptor.doAfter(fluxContext); + Assert.assertTrue(fluxContextResult.getResult() instanceof Flux); + final List block = ((Flux) fluxContextResult.getResult()).collectList().block(); Assert.assertNotNull(block); Assert.assertEquals(block.size(), zkInstanceList.size() + instanceList.size()); + RegisterContext.INSTANCE.setAvailable(false); + final ExecuteContext NotAvailableContext = interceptor.doBefore( + buildContext(client, new Object[]{serviceName})); + Assert.assertTrue(NotAvailableContext.isSkip()); + Assert.assertTrue(CollectionUtils.isEmpty((Collection) NotAvailableContext.getResult())); + REGISTER_CONFIG.setEnableSpringRegister(false); REGISTER_CONFIG.setOpenMigration(false); } diff --git a/sermant-plugins/sermant-service-registry/spring-cloud-registry-plugin/src/test/java/com/huawei/registry/interceptors/DiscoveryClientServiceInterceptorTest.java b/sermant-plugins/sermant-service-registry/spring-cloud-registry-plugin/src/test/java/com/huawei/registry/interceptors/DiscoveryClientServiceInterceptorTest.java index 1fcd79451e..3b23257cc7 100644 --- a/sermant-plugins/sermant-service-registry/spring-cloud-registry-plugin/src/test/java/com/huawei/registry/interceptors/DiscoveryClientServiceInterceptorTest.java +++ b/sermant-plugins/sermant-service-registry/spring-cloud-registry-plugin/src/test/java/com/huawei/registry/interceptors/DiscoveryClientServiceInterceptorTest.java @@ -34,6 +34,7 @@ import org.mockito.MockitoAnnotations; import org.springframework.cloud.client.discovery.composite.CompositeDiscoveryClient; import org.springframework.cloud.client.discovery.composite.reactive.ReactiveCompositeDiscoveryClient; + import reactor.core.publisher.Flux; import java.util.ArrayList; @@ -67,8 +68,6 @@ public void setUp() throws Exception { services.add("test1"); services.add("test2"); Mockito.when(registerCenterService.getServices()).thenReturn(services); - Mockito.when(client.getServices()).thenReturn(originServices); - Mockito.when(reactiveCompositeDiscoveryClient.getServices()).thenReturn(Flux.fromIterable(originServices)); } @After @@ -85,19 +84,22 @@ public void doBefore() throws NoSuchMethodException { RegisterContext.INSTANCE.setAvailable(true); REGISTER_CONFIG.setEnableSpringRegister(true); REGISTER_CONFIG.setOpenMigration(true); - final ExecuteContext context = interceptor.doBefore(buildContext(client, null)); - Assert.assertTrue(context.isSkip()); + final ExecuteContext context = interceptor.doAfter(buildContext(client, null, originServices)); Assert.assertTrue(context.getResult() instanceof List); Assert.assertEquals(((List) context.getResult()).size(), originServices.size() + services.size()); // 测试flux - final ExecuteContext fluxContext = interceptor.doBefore(buildContext(reactiveCompositeDiscoveryClient, null)); - Assert.assertTrue(fluxContext.isSkip()); + final ExecuteContext fluxContext = interceptor.doAfter( + buildContext(reactiveCompositeDiscoveryClient, null, Flux.fromIterable(originServices))); Assert.assertTrue(fluxContext.getResult() instanceof Flux); final List block = ((Flux) fluxContext.getResult()).collectList().block(); Assert.assertNotNull(block); Assert.assertEquals(block.size(), originServices.size() + services.size()); + RegisterContext.INSTANCE.setAvailable(false); + final ExecuteContext NotAvailableContext = interceptor.doBefore( + buildContext(client, null)); + Assert.assertTrue(NotAvailableContext.isSkip()); REGISTER_CONFIG.setEnableSpringRegister(false); REGISTER_CONFIG.setOpenMigration(false); }