Skip to content

Commit

Permalink
formatting
Browse files Browse the repository at this point in the history
  • Loading branch information
GrantPSpencer committed Sep 27, 2024
1 parent 626e45a commit 2d504d3
Show file tree
Hide file tree
Showing 4 changed files with 71 additions and 51 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -590,7 +590,6 @@ private static PipelineRegistry createDefaultRegistry(String pipelineName) {
customizedViewPipeline, rebalancePipeline);
registry.register(ClusterEventType.CustomizeStateConfigChange, dataRefresh, dataPreprocess,
customizedViewPipeline, rebalancePipeline);
// gspencer todo: limit to just refresh instance and instance configs??
registry.register(ClusterEventType.ParticipantDeregistration, dataRefresh, participantDeregistrationPipeline);
return registry;
}
Expand Down Expand Up @@ -1131,6 +1130,8 @@ public void onLiveInstanceChange(List<LiveInstance> liveInstances,
if (liveInstances == null) {
liveInstances = Collections.emptyList();
}

// Go though the live instance list and make sure that we are observing them
// accordingly. The action is done regardless of the paused flag.
if (changeContext.getType() == NotificationContext.Type.INIT
|| changeContext.getType() == NotificationContext.Type.CALLBACK) {
Expand Down Expand Up @@ -1648,7 +1649,6 @@ synchronized void closeRebalancer() {
}
}


Timer _participantDeregistrationTimer = null;
AtomicReference<ParticipantDeregistrationTask> _nextParticipantDeregistrationTask = new AtomicReference<>();
class ParticipantDeregistrationTask extends TimerTask {
Expand All @@ -1672,15 +1672,16 @@ public void run() {
}
}


public void scheduleParticipantDeregistration(long delay) {
logger.info("Scheduled deregister participants task for cluster {}.", _helixManager.getClusterName());
long currentTime = System.currentTimeMillis();
long deregistrationTime = currentTime + delay;
// If delay > 0, check if there is already a task scheduled for earlier time.
// If delay == 0, immediately schedule the task
if (delay > 0) {
ParticipantDeregistrationTask preTask = _nextParticipantDeregistrationTask.get();
if (preTask != null && preTask.getNextDeregistrationTime() > currentTime
&& preTask.getNextDeregistrationTime() < deregistrationTime) {
&& preTask.getNextDeregistrationTime() <= deregistrationTime) {
// already have an earlier participant deregistration scheduled, no need to schedule again.
return;
}
Expand All @@ -1691,6 +1692,7 @@ public void scheduleParticipantDeregistration(long delay) {
logger.info("Scheduled deregister participants task for cluster {} to occur at time {}.",
_helixManager.getClusterName(), deregistrationTime);

// Cancel the previous task if it exists.
ParticipantDeregistrationTask preTask = _nextParticipantDeregistrationTask.getAndSet(newTask);
if (preTask != null) {
preTask.cancel();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1051,7 +1051,6 @@ private void updateOfflineInstanceHistory(HelixDataAccessor accessor) {
if (!_updateInstanceOfflineTime) {
return;
}

List<String> offlineNodes = new ArrayList<>(_allInstanceConfigCache.getPropertyMap().keySet());
offlineNodes.removeAll(_allLiveInstanceCache.getPropertyMap().keySet());
_instanceOfflineTimeMap = new HashMap<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,21 +35,12 @@ public void execute(ClusterEvent event) throws Exception {
return;
}


// scan all offlineTimeMap:
// Create list of nodes to deregister - can we batch deregister these?
// detect the smallest difference between now() and offlineTime + delay
// probably need to use same now() time for all nodes so there is not condition where we miss a node because time elapsed between deregister check and queue check
// schedule participantDeregistration for that earliest time


ResourceControllerDataProvider cache = event.getAttribute(AttributeName.ControllerDataProvider.name());
// gpsencer TODO: bypass any nodes not in cluster? Or should we let the deregister logic no op on them
Map<String, Long> offlineTimeMap = cache.getInstanceOfflineTimeMap();
long deregisterDelay = clusterConfig.getParticipantDeregistrationTimeout();
long stageStartTime = System.currentTimeMillis();
Set<String> participantsToDeregister = new HashSet<>();
Long earliestDeregisterTime = Long.MAX_VALUE;
long earliestDeregisterTime = Long.MAX_VALUE;


for (Map.Entry<String, Long> entry : offlineTimeMap.entrySet()) {
Expand All @@ -62,20 +53,29 @@ public void execute(ClusterEvent event) throws Exception {
continue;
}

// If deregister time is in the past, deregister the instance
if (deregisterTime <= stageStartTime) {
participantsToDeregister.add(instanceName);
} else {
// Otherwise, find the next earliest deregister time
if (deregisterTime < earliestDeregisterTime) {
earliestDeregisterTime = deregisterTime;
}
}
}

Set<String> deregisteredParticipants = deregisterParticipants(manager, cache, participantsToDeregister);
System.out.println("successfully deregistered: " + deregisteredParticipants);
if (!participantsToDeregister.isEmpty()) {
Set<String> successfullyDeregisteredParticipants =
deregisterParticipants(manager, cache, participantsToDeregister);
if (!successfullyDeregisteredParticipants.isEmpty()) {
LOG.info("Successfully deregistered {} participants from cluster {}",
successfullyDeregisteredParticipants.size(), cache.getClusterName());
}
}
// Schedule the next deregister task
if (earliestDeregisterTime != Long.MAX_VALUE) {
long delay = earliestDeregisterTime - stageStartTime;
scheduleParticipantDeregistration(manager.getClusterName(), participantsToDeregister, delay);
scheduleParticipantDeregistration(manager.getClusterName(), delay);
}
}

Expand All @@ -88,24 +88,21 @@ private Set<String> deregisterParticipants(HelixManager manager, ResourceControl
return successfullyDeregisteredInstances;
}

// Perform safety checks before deregistering the instances
for (String instanceName : instancesToDeregister) {
// check if instance is alive or does not have INSTANCEOPERATION = UNKNOWN
// if either true, then no-op
// if both false, then call admin API to drop the participant
InstanceConfig instanceConfig = cache.getInstanceConfigMap().get(instanceName);
LiveInstance liveInstance = cache.getLiveInstances().get(instanceName);

if (instanceConfig == null ) {
if (instanceConfig == null) {
LOG.debug("Instance config is null for instance {}, skip deregistering the instance", instanceName);
continue;
}

if (liveInstance != null && liveInstance.isValid()) {
if (liveInstance != null) {
LOG.debug("Instance {} is still alive, skip deregistering the instance", instanceName);
continue;
}

LOG.info("Deregistering instance {} from cluster {}", instanceName, cache.getClusterName());
try {
manager.getClusterManagmentTool().dropInstance(cache.getClusterName(), instanceConfig);
successfullyDeregisteredInstances.add(instanceName);
Expand All @@ -117,7 +114,7 @@ private Set<String> deregisterParticipants(HelixManager manager, ResourceControl
return successfullyDeregisteredInstances;
}

private void scheduleParticipantDeregistration(String clusterName, Set<String> instancesToCheck, long delay) {
private void scheduleParticipantDeregistration(String clusterName, long delay) {
if (clusterName == null) {
LOG.error("Failed to schedule deregister participants. Cluster name is null.");
return;
Expand All @@ -127,8 +124,8 @@ private void scheduleParticipantDeregistration(String clusterName, Set<String> i
if (leaderController != null) {
leaderController.scheduleParticipantDeregistration(delay);
} else {
LOG.error("Failed to schedule deregister participants task for instances {}. "
+ "Controller for cluster {} does not exist.", instancesToCheck, clusterName);
LOG.error("Failed to schedule deregister participants task. Controller for cluster {} does not exist.",
clusterName);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ public class TestParticipantDeregistrationStage extends ZkTestBase {
protected final String CLUSTER_NAME = CLUSTER_PREFIX + "_" + CLASS_NAME;
private static final int NUM_NODES = 5;
private List<MockParticipantManager> _participants = new ArrayList<>();
private BestPossibleExternalViewVerifier _bestPossibleClusterVerifier;
private HelixAdmin _admin;
private HelixDataAccessor _dataAccessor;
private ClusterControllerManager _controller;
Expand All @@ -41,10 +40,6 @@ public void beforeClass() {
addParticipant(CLUSTER_NAME, instanceName);
}

_bestPossibleClusterVerifier = new BestPossibleExternalViewVerifier.Builder(CLUSTER_NAME)
.setZkAddr(ZK_ADDR).setWaitTillVerify(TestHelper.DEFAULT_REBALANCE_PROCESSING_WAIT_TIME)
.build();

_configAccessor = new ConfigAccessor(_gZkClient);

// start controller
Expand Down Expand Up @@ -94,7 +89,7 @@ public void testReconnectedParticipantNotDeregisteredWhenLive() throws Exception
Thread.sleep(DEREGISTER_TIMEOUT * 3/5);

// Manually recreate live instance so controller thinks it's back online
// This should unschedule the registration
// This should prevent the node from being deregistered
_dataAccessor.setProperty(_dataAccessor.keyBuilder().liveInstance(participantToDeregister.getInstanceName()),
liveInstance);

Expand All @@ -105,18 +100,19 @@ public void testReconnectedParticipantNotDeregisteredWhenLive() throws Exception
.contains(participantToDeregister.getInstanceName()), "Participant should not have been deregistered");
}

// Re kill and assert that the instance is deregistered
_dataAccessor.removeProperty(_dataAccessor.keyBuilder().liveInstance(participantToDeregister.getInstanceName()));
boolean result = TestHelper.verify(() -> !_admin.getInstancesInCluster(CLUSTER_NAME)
.contains(participantToDeregister.getInstanceName()), TestHelper.WAIT_DURATION);
Assert.assertTrue(result, "Participant should have been deregistered");
Assert.assertTrue(result, "Participants should have been deregistered. Participants to deregister: "
+ participantToDeregister + " Remaining participants: in cluster " + _admin.getInstancesInCluster(CLUSTER_NAME));

dropParticipant(CLUSTER_NAME, participantToDeregister);
addParticipant(CLUSTER_NAME, participantToDeregister.getInstanceName());
System.out.println("END " + TestHelper.getTestClassName() + "." + TestHelper.getTestMethodName() + " at "
+ new Date(System.currentTimeMillis()));
}


// Same assertions as above but this time the node is re-killed immediately after being added back
@Test (dependsOnMethods = "testReconnectedParticipantNotDeregisteredWhenLive")
public void testFlappingParticipantIsNotDeregistered() throws Exception {
Expand Down Expand Up @@ -154,60 +150,86 @@ public void testFlappingParticipantIsNotDeregistered() throws Exception {
_dataAccessor.removeProperty(_dataAccessor.keyBuilder().liveInstance(participantToDeregister.getInstanceName()));
boolean result = TestHelper.verify(() -> !_admin.getInstancesInCluster(CLUSTER_NAME)
.contains(participantToDeregister.getInstanceName()), TestHelper.WAIT_DURATION);
Assert.assertTrue(result, "Participant should have been deregistered");
Assert.assertTrue(result, "Participants should have been deregistered. Participants to deregister: "
+ participantToDeregister + " Remaining participants: in cluster " + _admin.getInstancesInCluster(CLUSTER_NAME));

dropParticipant(CLUSTER_NAME, participantToDeregister);
addParticipant(CLUSTER_NAME, participantToDeregister.getInstanceName());
System.out.println("END " + TestHelper.getTestClassName() + "." + TestHelper.getTestMethodName() + " at "
+ new Date(System.currentTimeMillis()));
}

// Tests enabling deregister will trigger deregister for participants that were already offline
@Test (dependsOnMethods = "testFlappingParticipantIsNotDeregistered")
public void testClusterConfigChangeImmediatelyTriggersDeregistration() throws Exception {
public void testDeregisterAfterConfigEnabled() throws Exception {
System.out.println("START " + TestHelper.getTestClassName() + "." + TestHelper.getTestMethodName() + " at "
+ new Date(System.currentTimeMillis()));

// Set to deregister to disabled
long testDeregisterTimeout = 1000;
setAutoDeregisterConfigs(CLUSTER_NAME, false, testDeregisterTimeout);

// Create and immediately kill participants
List<MockParticipantManager> killedParticipants = new ArrayList<>();
for (int i = 0; i < 5; i++) {
MockParticipantManager participantToKill = addParticipant(CLUSTER_NAME, "participants_to_kill_" + i);
participantToKill.syncStop();
killedParticipants.add(participantToKill);
}

// Sleep so that participant offline time exceeds deregister timeout
Thread.sleep(testDeregisterTimeout);
// Trigger on disable --> enable deregister
setAutoDeregisterConfigs(CLUSTER_NAME, true, testDeregisterTimeout);

// Assert participants have been deregistered
boolean result = TestHelper.verify(() -> {
List<String> instances = _admin.getInstancesInCluster(CLUSTER_NAME);
return killedParticipants.stream().noneMatch(participant -> instances.contains(participant.getInstanceName()));
}, TestHelper.WAIT_DURATION);
Assert.assertTrue(result, "Participants should have been deregistered");

// Trigger on enable --> enable with shorter deregister
setAutoDeregisterConfigs(CLUSTER_NAME, true, 1000000);
Assert.assertTrue(result, "Participants should have been deregistered. Participants to deregister: "
+ killedParticipants + " Remaining participants: in cluster " + _admin.getInstancesInCluster(CLUSTER_NAME));

List<MockParticipantManager> replacementParticipants = new ArrayList<>();
//reset dead participants
// reset cluster state
killedParticipants.forEach(participant -> {
dropParticipant(CLUSTER_NAME, participant);
MockParticipantManager replacement = addParticipant(CLUSTER_NAME, participant.getInstanceName());
replacement.syncStop();
replacementParticipants.add(replacement);
});
}

Thread.sleep(testDeregisterTimeout*5);
setAutoDeregisterConfigs(CLUSTER_NAME, true, testDeregisterTimeout);
// Tests shortening deregister timeout will trigger deregister and also deregister participants that now exceed
// the new (shorter) timeout
@Test (dependsOnMethods = "testDeregisterAfterConfigEnabled")
public void testDeregisterAfterConfigTimeoutShortened() throws Exception {
long longDeregisterTimeout = 1000*60*60*24;
long shortDeregisterTimeout = 1000;
setAutoDeregisterConfigs(CLUSTER_NAME, true, longDeregisterTimeout);

result = TestHelper.verify(() -> {
// Create and immediately kill participants
List<MockParticipantManager> killedParticipants = new ArrayList<>();
for (int i = 0; i < 5; i++) {
MockParticipantManager participantToKill = addParticipant(CLUSTER_NAME, "participants_to_kill_" + i);
participantToKill.syncStop();
killedParticipants.add(participantToKill);
}

// Sleep so that participant offline time exceeds deregister timeout
Thread.sleep(shortDeregisterTimeout);

// Trigger on shorten deregister timeout
setAutoDeregisterConfigs(CLUSTER_NAME, true, shortDeregisterTimeout);

// Assert participants have been deregistered
boolean result = TestHelper.verify(() -> {
List<String> instances = _admin.getInstancesInCluster(CLUSTER_NAME);
return replacementParticipants.stream().noneMatch(participant -> instances.contains(participant.getInstanceName()));
return killedParticipants.stream().noneMatch(participant -> instances.contains(participant.getInstanceName()));
}, TestHelper.WAIT_DURATION);
Assert.assertTrue(result, "Participants should have been deregistered");
Assert.assertTrue(result, "Participants should have been deregistered. Participants to deregister: "
+ killedParticipants + " Remaining participants: in cluster " + _admin.getInstancesInCluster(CLUSTER_NAME));

// reset cluster state
killedParticipants.forEach(participant -> {
dropParticipant(CLUSTER_NAME, participant);
});
}

@Override
Expand Down

0 comments on commit 2d504d3

Please sign in to comment.