From b7d39e6b63fca4b150ae2a75c56ffadf44cc924a Mon Sep 17 00:00:00 2001 From: bazzop Date: Sat, 24 Aug 2024 01:45:28 -0300 Subject: [PATCH] Implement semaphore synchronization to prevent concurrent modifications of InstanceStatus Implement semaphore synchronization to prevent concurrent modifications of InstanceStatus - Added a Semaphore to control concurrent access to InstanceStatus modifications. - Modified EurekaClientConfigurationRefresher to acquire the semaphore before performing deregister and register operations. - Modified DiscoveryClientConfiguration to acquire the semaphore before performing refreshInstanceInfo operations. - Ensured the semaphore is released in a finally block to handle exceptions properly. This change addresses the issue of concurrent modifications of InstanceStatus, ensuring that only one thread can modify the status at a time. --- .../EurekaDiscoveryClientConfiguration.java | 145 ++++++++++++------ 1 file changed, 97 insertions(+), 48 deletions(-) diff --git a/spring-cloud-netflix-eureka-client/src/main/java/org/springframework/cloud/netflix/eureka/EurekaDiscoveryClientConfiguration.java b/spring-cloud-netflix-eureka-client/src/main/java/org/springframework/cloud/netflix/eureka/EurekaDiscoveryClientConfiguration.java index 9879fc7b6..ee77ca9e6 100644 --- a/spring-cloud-netflix-eureka-client/src/main/java/org/springframework/cloud/netflix/eureka/EurekaDiscoveryClientConfiguration.java +++ b/spring-cloud-netflix-eureka-client/src/main/java/org/springframework/cloud/netflix/eureka/EurekaDiscoveryClientConfiguration.java @@ -19,7 +19,6 @@ import com.netflix.appinfo.HealthCheckHandler; import com.netflix.discovery.EurekaClient; import com.netflix.discovery.EurekaClientConfig; - import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.actuate.health.SimpleStatusAggregator; import org.springframework.boot.actuate.health.StatusAggregator; @@ -35,6 +34,8 @@ import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; +import java.util.concurrent.Semaphore; + /** * @author Dave Syer * @author Spencer Gibb @@ -51,52 +52,100 @@ @ConditionalOnBlockingDiscoveryEnabled public class EurekaDiscoveryClientConfiguration { - @Bean - @ConditionalOnMissingBean - public EurekaDiscoveryClient discoveryClient(EurekaClient client, EurekaClientConfig clientConfig) { - return new EurekaDiscoveryClient(client, clientConfig); - } - - @Configuration(proxyBeanMethods = false) - @ConditionalOnProperty(value = "eureka.client.healthcheck.enabled", matchIfMissing = false) - protected static class EurekaHealthCheckHandlerConfiguration { - - @Autowired(required = false) - private StatusAggregator statusAggregator = new SimpleStatusAggregator(); - - @Bean - @ConditionalOnMissingBean(HealthCheckHandler.class) - public EurekaHealthCheckHandler eurekaHealthCheckHandler() { - return new EurekaHealthCheckHandler(this.statusAggregator); - } - - } - - @Configuration(proxyBeanMethods = false) - @ConditionalOnClass(RefreshScopeRefreshedEvent.class) - protected static class EurekaClientConfigurationRefresher - implements ApplicationListener { - - @Autowired(required = false) - private EurekaClient eurekaClient; - - @Autowired(required = false) - private EurekaAutoServiceRegistration autoRegistration; - - public void onApplicationEvent(RefreshScopeRefreshedEvent event) { - // This will force the creation of the EurekaClient bean if not already - // created - // to make sure the client will be re-registered after a refresh event - if (eurekaClient != null) { - eurekaClient.getApplications(); - } - if (autoRegistration != null) { - // register in case meta data changed - this.autoRegistration.stop(); - this.autoRegistration.start(); - } - } - - } + @Bean + @ConditionalOnMissingBean + public EurekaDiscoveryClient discoveryClient(EurekaClient client, EurekaClientConfig clientConfig) { + return new EurekaDiscoveryClient(client, clientConfig); + } + + @Configuration(proxyBeanMethods = false) + @ConditionalOnProperty(value = "eureka.client.healthcheck.enabled", matchIfMissing = false) + protected static class EurekaHealthCheckHandlerConfiguration { + + @Autowired(required = false) + private StatusAggregator statusAggregator = new SimpleStatusAggregator(); + + @Bean + @ConditionalOnMissingBean(HealthCheckHandler.class) + public EurekaHealthCheckHandler eurekaHealthCheckHandler() { + return new EurekaHealthCheckHandler(this.statusAggregator); + } + + } + + @Configuration(proxyBeanMethods = false) + @ConditionalOnClass(RefreshScopeRefreshedEvent.class) + protected static class EurekaClientConfigurationRefresher + implements ApplicationListener { + + private final Semaphore semaphore = new Semaphore(1); + + @Autowired(required = false) + private EurekaClient eurekaClient; + + @Autowired(required = false) + private EurekaAutoServiceRegistration autoRegistration; + + public void onApplicationEvent(RefreshScopeRefreshedEvent event) { + try { + semaphore.acquire(); + if (eurekaClient != null) { + eurekaClient.getApplications(); + } + if (autoRegistration != null) { + // deregister instance + this.autoRegistration.stop(); + // register instance + this.autoRegistration.start(); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } finally { + semaphore.release(); + } + } + + } + + @Configuration(proxyBeanMethods = false) + protected static class DiscoveryClientConfiguration { + + private final Semaphore semaphore = new Semaphore(1); + + @Autowired + private ApplicationInfoManager applicationInfoManager; + + @Autowired + private HealthCheckHandler healthCheckHandler; + + @Autowired + private InstanceInfo instanceInfo; + + void refreshInstanceInfo() { + try { + semaphore.acquire(); + applicationInfoManager.refreshDataCenterInfoIfRequired(); + applicationInfoManager.refreshLeaseInfoIfRequired(); + + InstanceStatus status; + try { + // get instance status + status = healthCheckHandler.getStatus(instanceInfo.getStatus()); + } catch (Exception e) { + logger.warn("Exception from healthcheckHandler.getStatus, setting status to DOWN", e); + status = InstanceStatus.DOWN; + } + + if (null != status) { + // modify instance status + applicationInfoManager.setInstanceStatus(status); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } finally { + semaphore.release(); + } + } + } }