Skip to content

Commit

Permalink
Implement semaphore synchronization to prevent concurrent modificatio…
Browse files Browse the repository at this point in the history
…ns of InstanceStatus

Implement semaphore synchronization to prevent concurrent modifications of InstanceStatus

- Added a Semaphore to control concurrent access to InstanceStatus modifications.
- Modified EurekaClientConfigurationRefresher to acquire the semaphore before performing deregister and register operations.
- Modified DiscoveryClientConfiguration to acquire the semaphore before performing refreshInstanceInfo operations.
- Ensured the semaphore is released in a finally block to handle exceptions properly.

This change addresses the issue of concurrent modifications of InstanceStatus, ensuring that only one thread can modify the status at a time.
  • Loading branch information
bazzop authored Aug 24, 2024
1 parent 4da4de6 commit b7d39e6
Showing 1 changed file with 97 additions and 48 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import com.netflix.appinfo.HealthCheckHandler;
import com.netflix.discovery.EurekaClient;
import com.netflix.discovery.EurekaClientConfig;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.actuate.health.SimpleStatusAggregator;
import org.springframework.boot.actuate.health.StatusAggregator;
Expand All @@ -35,6 +34,8 @@
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.concurrent.Semaphore;

/**
* @author Dave Syer
* @author Spencer Gibb
Expand All @@ -51,52 +52,100 @@
@ConditionalOnBlockingDiscoveryEnabled
public class EurekaDiscoveryClientConfiguration {

@Bean
@ConditionalOnMissingBean
public EurekaDiscoveryClient discoveryClient(EurekaClient client, EurekaClientConfig clientConfig) {
return new EurekaDiscoveryClient(client, clientConfig);
}

@Configuration(proxyBeanMethods = false)
@ConditionalOnProperty(value = "eureka.client.healthcheck.enabled", matchIfMissing = false)
protected static class EurekaHealthCheckHandlerConfiguration {

@Autowired(required = false)
private StatusAggregator statusAggregator = new SimpleStatusAggregator();

@Bean
@ConditionalOnMissingBean(HealthCheckHandler.class)
public EurekaHealthCheckHandler eurekaHealthCheckHandler() {
return new EurekaHealthCheckHandler(this.statusAggregator);
}

}

@Configuration(proxyBeanMethods = false)
@ConditionalOnClass(RefreshScopeRefreshedEvent.class)
protected static class EurekaClientConfigurationRefresher
implements ApplicationListener<RefreshScopeRefreshedEvent> {

@Autowired(required = false)
private EurekaClient eurekaClient;

@Autowired(required = false)
private EurekaAutoServiceRegistration autoRegistration;

public void onApplicationEvent(RefreshScopeRefreshedEvent event) {
// This will force the creation of the EurekaClient bean if not already
// created
// to make sure the client will be re-registered after a refresh event
if (eurekaClient != null) {
eurekaClient.getApplications();
}
if (autoRegistration != null) {
// register in case meta data changed
this.autoRegistration.stop();
this.autoRegistration.start();
}
}

}
@Bean
@ConditionalOnMissingBean
public EurekaDiscoveryClient discoveryClient(EurekaClient client, EurekaClientConfig clientConfig) {
return new EurekaDiscoveryClient(client, clientConfig);
}

@Configuration(proxyBeanMethods = false)
@ConditionalOnProperty(value = "eureka.client.healthcheck.enabled", matchIfMissing = false)
protected static class EurekaHealthCheckHandlerConfiguration {

@Autowired(required = false)
private StatusAggregator statusAggregator = new SimpleStatusAggregator();

@Bean
@ConditionalOnMissingBean(HealthCheckHandler.class)
public EurekaHealthCheckHandler eurekaHealthCheckHandler() {
return new EurekaHealthCheckHandler(this.statusAggregator);
}

}

@Configuration(proxyBeanMethods = false)
@ConditionalOnClass(RefreshScopeRefreshedEvent.class)
protected static class EurekaClientConfigurationRefresher
implements ApplicationListener<RefreshScopeRefreshedEvent> {

private final Semaphore semaphore = new Semaphore(1);

@Autowired(required = false)
private EurekaClient eurekaClient;

@Autowired(required = false)
private EurekaAutoServiceRegistration autoRegistration;

public void onApplicationEvent(RefreshScopeRefreshedEvent event) {
try {
semaphore.acquire();
if (eurekaClient != null) {
eurekaClient.getApplications();
}
if (autoRegistration != null) {
// deregister instance
this.autoRegistration.stop();
// register instance
this.autoRegistration.start();
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
semaphore.release();
}
}

}

@Configuration(proxyBeanMethods = false)
protected static class DiscoveryClientConfiguration {

private final Semaphore semaphore = new Semaphore(1);

@Autowired
private ApplicationInfoManager applicationInfoManager;

@Autowired
private HealthCheckHandler healthCheckHandler;

@Autowired
private InstanceInfo instanceInfo;

void refreshInstanceInfo() {
try {
semaphore.acquire();
applicationInfoManager.refreshDataCenterInfoIfRequired();
applicationInfoManager.refreshLeaseInfoIfRequired();

InstanceStatus status;
try {
// get instance status
status = healthCheckHandler.getStatus(instanceInfo.getStatus());
} catch (Exception e) {
logger.warn("Exception from healthcheckHandler.getStatus, setting status to DOWN", e);
status = InstanceStatus.DOWN;
}

if (null != status) {
// modify instance status
applicationInfoManager.setInstanceStatus(status);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
semaphore.release();
}
}
}

}

0 comments on commit b7d39e6

Please sign in to comment.