From 626e45a79cec4c6ad43253c8216cdb163da87c1b Mon Sep 17 00:00:00 2001 From: Grant Palau Spencer Date: Thu, 26 Sep 2024 18:06:43 -0700 Subject: [PATCH] refactor --- .../controller/GenericHelixController.java | 137 ++++++----------- .../BaseControllerDataProvider.java | 1 + .../controller/stages/ClusterEventType.java | 1 + .../ParticipantDeregistrationStage.java | 114 ++++++++++++-- .../util/ParticipantDeregistrationUtil.java | 97 ------------ .../TestParticipantDeregistrationStage.java | 143 ++++++++++-------- ...eWhenRequireDelayedRebalanceOverwrite.java | 8 +- .../integration/TestForceKillInstance.java | 38 ++--- 8 files changed, 250 insertions(+), 289 deletions(-) delete mode 100644 helix-core/src/main/java/org/apache/helix/util/ParticipantDeregistrationUtil.java diff --git a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java index 0b6efc1d05..028e0529ac 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java +++ b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java @@ -111,7 +111,6 @@ import org.apache.helix.model.ResourceConfig; import org.apache.helix.monitoring.mbeans.ClusterEventMonitor; import org.apache.helix.monitoring.mbeans.ClusterStatusMonitor; -import org.apache.helix.util.ParticipantDeregistrationUtil; import org.apache.helix.zookeeper.zkclient.exception.ZkInterruptedException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -548,7 +547,10 @@ private static PipelineRegistry createDefaultRegistry(String pipelineName) { // backward compatibility check Pipeline liveInstancePipeline = new Pipeline(pipelineName); liveInstancePipeline.addStage(new CompatibilityCheckStage()); - liveInstancePipeline.addStage(new ParticipantDeregistrationStage()); + + // Participant Deregistration Pipeline + Pipeline participantDeregistrationPipeline = new Pipeline(pipelineName); + participantDeregistrationPipeline.addStage(new ParticipantDeregistrationStage()); // auto-exit maintenance mode if applicable Pipeline autoExitMaintenancePipeline = new Pipeline(pipelineName); @@ -564,11 +566,11 @@ private static PipelineRegistry createDefaultRegistry(String pipelineName) { rebalancePipeline); registry .register(ClusterEventType.ClusterConfigChange, dataRefresh, autoExitMaintenancePipeline, - dataPreprocess, rebalancePipeline); + dataPreprocess, rebalancePipeline, participantDeregistrationPipeline); registry .register(ClusterEventType.LiveInstanceChange, dataRefresh, autoExitMaintenancePipeline, liveInstancePipeline, dataPreprocess, externalViewPipeline, customizedViewPipeline, - rebalancePipeline); + rebalancePipeline, participantDeregistrationPipeline); registry .register(ClusterEventType.MessageChange, dataRefresh, dataPreprocess, rebalancePipeline); registry.register(ClusterEventType.Resume, dataRefresh, dataPreprocess, externalViewPipeline, @@ -588,6 +590,8 @@ private static PipelineRegistry createDefaultRegistry(String pipelineName) { customizedViewPipeline, rebalancePipeline); registry.register(ClusterEventType.CustomizeStateConfigChange, dataRefresh, dataPreprocess, customizedViewPipeline, rebalancePipeline); + // gspencer todo: limit to just refresh instance and instance configs?? + registry.register(ClusterEventType.ParticipantDeregistration, dataRefresh, participantDeregistrationPipeline); return registry; } } @@ -615,6 +619,10 @@ private static PipelineRegistry createTaskRegistry(String pipelineName) { rebalancePipeline.addStage(new MessageGenerationPhase()); rebalancePipeline.addStage(new TaskMessageDispatchStage()); + // Participant Deregistration Pipeline + Pipeline participantDeregistrationPipeline = new Pipeline(pipelineName); + participantDeregistrationPipeline.addStage(new ParticipantDeregistrationStage()); + // backward compatibility check Pipeline liveInstancePipeline = new Pipeline(pipelineName); liveInstancePipeline.addStage(new CompatibilityCheckStage()); @@ -708,8 +716,8 @@ private GenericHelixController(PipelineRegistry registry, PipelineRegistry taskR _onDemandRebalanceTimer = new Timer("GenericHelixController_" + _clusterName + "_onDemand_Timer", true); - _deregisterParticipantsTimer - = new Timer("GenericHelixController_" + _clusterName + "_deregisterParticipants_Timer", true); + _participantDeregistrationTimer + = new Timer("GenericHelixController_" + _clusterName + "_participantDeregistration_Timer", true); // TODO: refactor to simplify below similar code of the 3 pipelines // initialize pipelines at the end so we have everything else prepared @@ -1123,28 +1131,6 @@ public void onLiveInstanceChange(List liveInstances, if (liveInstances == null) { liveInstances = Collections.emptyList(); } - - Set newlyOfflineInstanceNames = new HashSet<>(); - - synchronized (_lastSeenInstances) { - if (_lastSeenInstances.get() != null) { - // Record the last seen instances - newlyOfflineInstanceNames = _lastSeenInstances.get().values().stream() - .map(LiveInstance::getInstanceName) - .collect(Collectors.toSet()); - } - } - - // Find newly offline and newly online instances - Set currentLiveInstanceNames = liveInstances.stream() - .map(LiveInstance::getInstanceName) - .collect(Collectors.toSet()); - Set newlyOnlineInstanceNames = new HashSet<>(currentLiveInstanceNames); - newlyOnlineInstanceNames.removeAll(newlyOfflineInstanceNames); - newlyOfflineInstanceNames.removeAll(currentLiveInstanceNames); - - - // Go through the live instance list and make sure that we are observing them // accordingly. The action is done regardless of the paused flag. if (changeContext.getType() == NotificationContext.Type.INIT || changeContext.getType() == NotificationContext.Type.CALLBACK) { @@ -1156,11 +1142,9 @@ public void onLiveInstanceChange(List liveInstances, liveInstances = Collections.emptyList(); checkLiveInstancesObservation(liveInstances, changeContext); } - Map eventAttributes = new HashMap<>(); - eventAttributes.put(AttributeName.eventData.name(), liveInstances); - eventAttributes.put(AttributeName.NEWLY_OFFLINE_INSTANCE_NAMES.name(), newlyOfflineInstanceNames); - eventAttributes.put(AttributeName.NEWLY_ONLINE_INSTANCE_NAMES.name(), newlyOnlineInstanceNames); - pushToEventQueues(ClusterEventType.LiveInstanceChange, changeContext, eventAttributes); + + pushToEventQueues(ClusterEventType.LiveInstanceChange, changeContext, + Collections.singletonMap(AttributeName.eventData.name(), liveInstances)); logger.info( "END: Generic GenericClusterController.onLiveInstanceChange() for cluster " + _clusterName); @@ -1664,75 +1648,52 @@ synchronized void closeRebalancer() { } } - Map> _deregisterParticipantsTasks = new HashMap<>(); - Timer _deregisterParticipantsTimer = null; - class DeregisterParticipantsTask extends TimerTask { + + Timer _participantDeregistrationTimer = null; + AtomicReference _nextParticipantDeregistrationTask = new AtomicReference<>(); + class ParticipantDeregistrationTask extends TimerTask { private final HelixManager _manager; - private final ResourceControllerDataProvider _cache; - private final Set _participantsToDeregister; + private final long _nextDeregistrationTime; - public DeregisterParticipantsTask(HelixManager manager, ResourceControllerDataProvider cache, - Set participantsToDeregister) { + public ParticipantDeregistrationTask(HelixManager manager, long delay) { _manager = manager; - _cache = cache; - _participantsToDeregister = participantsToDeregister; + _nextDeregistrationTime = System.currentTimeMillis() + delay; + } + + public long getNextDeregistrationTime() { + return _nextDeregistrationTime; } @Override public void run() { - if (_participantsToDeregister.isEmpty()) { - return; - } - - _cache.refresh(_manager.getHelixDataAccessor()); - ParticipantDeregistrationUtil.deregisterParticipants(_manager, _cache, _participantsToDeregister, true); + NotificationContext changeContext = new NotificationContext(_manager); + changeContext.setType(NotificationContext.Type.CALLBACK); + pushToEventQueues(ClusterEventType.ParticipantDeregistration, changeContext, Collections.EMPTY_MAP); } } - public void scheduleDeregisterParticipantsTask(Set participantsToDeregister, long delay) { - if (participantsToDeregister == null || participantsToDeregister.isEmpty() || delay < 0) { - return; - } - - //gspencer TODO: remove print debug statements - System.out.println("Tasks scheduled before adding: " + _deregisterParticipantsTasks); - DeregisterParticipantsTask newTask = new DeregisterParticipantsTask(_helixManager, _resourceControlDataProvider, - participantsToDeregister); - for (String participant : participantsToDeregister) { - AtomicReference taskRef = - _deregisterParticipantsTasks.computeIfAbsent(participant, k -> new AtomicReference<>()); - DeregisterParticipantsTask prevTask = taskRef.get(); - if (prevTask != null) { - prevTask.cancel(); + public void scheduleParticipantDeregistration(long delay) { + logger.info("Scheduled deregister participants task for cluster {}.", _helixManager.getClusterName()); + long currentTime = System.currentTimeMillis(); + long deregistrationTime = currentTime + delay; + if (delay > 0) { + ParticipantDeregistrationTask preTask = _nextParticipantDeregistrationTask.get(); + if (preTask != null && preTask.getNextDeregistrationTime() > currentTime + && preTask.getNextDeregistrationTime() < deregistrationTime) { + // already have an earlier participant deregistration scheduled, no need to schedule again. + return; } - taskRef.set(newTask); - } - _deregisterParticipantsTimer.schedule(newTask, delay); - //gspencer TODO: remove print statement for debugging - System.out.println("Scheduled deregister for " + participantsToDeregister); - System.out.println("Currently scheduled tasks: " + _deregisterParticipantsTasks); - logger.info("Scheduled deregister participants task for cluster {} on instances: {}.", - _helixManager.getClusterName(), participantsToDeregister); - } - - public void unscheduleDeregisterParticipantsTask(Set participantsToUnschedule) { - if (participantsToUnschedule == null || participantsToUnschedule.isEmpty()) { - return; } - System.out.println("Tasks scheduled before removing: " + _deregisterParticipantsTasks); - for (String participant : participantsToUnschedule) { - if (_deregisterParticipantsTasks.containsKey(participant)) { - AtomicReference taskRef = _deregisterParticipantsTasks.get(participant); - DeregisterParticipantsTask task = taskRef.get(); - if (task != null) { - task.cancel(); - } - _deregisterParticipantsTasks.remove(participant); - } + ParticipantDeregistrationTask newTask = new ParticipantDeregistrationTask(_helixManager, delay); + _participantDeregistrationTimer.schedule(newTask, delay); + logger.info("Scheduled deregister participants task for cluster {} to occur at time {}.", + _helixManager.getClusterName(), deregistrationTime); + + ParticipantDeregistrationTask preTask = _nextParticipantDeregistrationTask.getAndSet(newTask); + if (preTask != null) { + preTask.cancel(); } - System.out.println("Unscheduled deregister for " + participantsToUnschedule); - System.out.println("Currently scheduled tasks: " + _deregisterParticipantsTasks); } } diff --git a/helix-core/src/main/java/org/apache/helix/controller/dataproviders/BaseControllerDataProvider.java b/helix-core/src/main/java/org/apache/helix/controller/dataproviders/BaseControllerDataProvider.java index 997f0f8aae..4523deca99 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/dataproviders/BaseControllerDataProvider.java +++ b/helix-core/src/main/java/org/apache/helix/controller/dataproviders/BaseControllerDataProvider.java @@ -1051,6 +1051,7 @@ private void updateOfflineInstanceHistory(HelixDataAccessor accessor) { if (!_updateInstanceOfflineTime) { return; } + List offlineNodes = new ArrayList<>(_allInstanceConfigCache.getPropertyMap().keySet()); offlineNodes.removeAll(_allLiveInstanceCache.getPropertyMap().keySet()); _instanceOfflineTimeMap = new HashMap<>(); diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterEventType.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterEventType.java index 65f6bb4e16..717e77449e 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterEventType.java +++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterEventType.java @@ -40,5 +40,6 @@ public enum ClusterEventType { ControllerChange, RetryRebalance, StateVerifier, + ParticipantDeregistration, Unknown } diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ParticipantDeregistrationStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ParticipantDeregistrationStage.java index fc4a2dbd03..debb30e8f9 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/stages/ParticipantDeregistrationStage.java +++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ParticipantDeregistrationStage.java @@ -1,13 +1,18 @@ package org.apache.helix.controller.stages; -import java.util.List; +import java.util.HashSet; +import java.util.Map; import java.util.Set; +import org.apache.helix.HelixException; import org.apache.helix.HelixManager; +import org.apache.helix.controller.GenericHelixController; import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider; import org.apache.helix.controller.pipeline.AbstractAsyncBaseStage; import org.apache.helix.controller.pipeline.AsyncWorkerType; import org.apache.helix.model.ClusterConfig; -import org.apache.helix.util.ParticipantDeregistrationUtil; +import org.apache.helix.model.InstanceConfig; +import org.apache.helix.model.LiveInstance; +import org.apache.helix.model.ParticipantHistory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -22,7 +27,6 @@ public AsyncWorkerType getAsyncWorkerType() { @Override public void execute(ClusterEvent event) throws Exception { - _eventId = event.getEventId(); HelixManager manager = event.getAttribute(AttributeName.helixmanager.name()); ClusterConfig clusterConfig = manager.getConfigAccessor().getClusterConfig(manager.getClusterName()); if (clusterConfig == null || !clusterConfig.isParticipantDeregistrationEnabled()) { @@ -31,20 +35,100 @@ public void execute(ClusterEvent event) throws Exception { return; } + + // scan all offlineTimeMap: + // Create list of nodes to deregister - can we batch deregister these? + // detect the smallest difference between now() and offlineTime + delay + // probably need to use same now() time for all nodes so there is not condition where we miss a node because time elapsed between deregister check and queue check + // schedule participantDeregistration for that earliest time + + ResourceControllerDataProvider cache = event.getAttribute(AttributeName.ControllerDataProvider.name()); - Set newlyOfflineInstanceNames = event.getAttribute(AttributeName.NEWLY_OFFLINE_INSTANCE_NAMES.name()); - Set newlyOnlineInstanceNames = event.getAttribute(AttributeName.NEWLY_ONLINE_INSTANCE_NAMES.name()); // gpsencer TODO: bypass any nodes not in cluster? Or should we let the deregister logic no op on them - List instancesInCluster = manager.getClusterManagmentTool().getInstancesInCluster(manager.getClusterName()); - newlyOfflineInstanceNames.retainAll(instancesInCluster); - - Set deregisteredParticipants = ParticipantDeregistrationUtil.deregisterParticipants( - manager, cache, newlyOfflineInstanceNames, false); - newlyOfflineInstanceNames.removeAll(deregisteredParticipants); - if (!newlyOfflineInstanceNames.isEmpty()) { - ParticipantDeregistrationUtil.scheduleDeregisterParticipants(manager.getClusterName(), - newlyOfflineInstanceNames, clusterConfig.getParticipantDeregistrationTimeout()); + Map offlineTimeMap = cache.getInstanceOfflineTimeMap(); + long deregisterDelay = clusterConfig.getParticipantDeregistrationTimeout(); + long stageStartTime = System.currentTimeMillis(); + Set participantsToDeregister = new HashSet<>(); + Long earliestDeregisterTime = Long.MAX_VALUE; + + + for (Map.Entry entry : offlineTimeMap.entrySet()) { + String instanceName = entry.getKey(); + Long offlineTime = entry.getValue(); + long deregisterTime = offlineTime + deregisterDelay; + + // Skip if instance is still online + if (offlineTime == ParticipantHistory.ONLINE) { + continue; + } + + if (deregisterTime <= stageStartTime) { + participantsToDeregister.add(instanceName); + } else { + if (deregisterTime < earliestDeregisterTime) { + earliestDeregisterTime = deregisterTime; + } + } + } + + Set deregisteredParticipants = deregisterParticipants(manager, cache, participantsToDeregister); + System.out.println("successfully deregistered: " + deregisteredParticipants); + if (earliestDeregisterTime != Long.MAX_VALUE) { + long delay = earliestDeregisterTime - stageStartTime; + scheduleParticipantDeregistration(manager.getClusterName(), participantsToDeregister, delay); + } + } + + private Set deregisterParticipants(HelixManager manager, ResourceControllerDataProvider cache, + Set instancesToDeregister) { + Set successfullyDeregisteredInstances = new HashSet<>(); + + if (manager == null || !manager.isConnected() || cache == null || instancesToDeregister == null) { + LOG.info("ParticipantDeregistrationStage failed due to HelixManager being null or not connected!"); + return successfullyDeregisteredInstances; + } + + for (String instanceName : instancesToDeregister) { + // check if instance is alive or does not have INSTANCEOPERATION = UNKNOWN + // if either true, then no-op + // if both false, then call admin API to drop the participant + InstanceConfig instanceConfig = cache.getInstanceConfigMap().get(instanceName); + LiveInstance liveInstance = cache.getLiveInstances().get(instanceName); + + if (instanceConfig == null ) { + LOG.debug("Instance config is null for instance {}, skip deregistering the instance", instanceName); + continue; + } + + if (liveInstance != null && liveInstance.isValid()) { + LOG.debug("Instance {} is still alive, skip deregistering the instance", instanceName); + continue; + } + + LOG.info("Deregistering instance {} from cluster {}", instanceName, cache.getClusterName()); + try { + manager.getClusterManagmentTool().dropInstance(cache.getClusterName(), instanceConfig); + successfullyDeregisteredInstances.add(instanceName); + } catch (HelixException e) { + LOG.error("Failed to deregister instance {} from cluster {}", instanceName, cache.getClusterName(), e); + } + } + + return successfullyDeregisteredInstances; + } + + private void scheduleParticipantDeregistration(String clusterName, Set instancesToCheck, long delay) { + if (clusterName == null) { + LOG.error("Failed to schedule deregister participants. Cluster name is null."); + return; + } + GenericHelixController leaderController = + GenericHelixController.getLeaderController(clusterName); + if (leaderController != null) { + leaderController.scheduleParticipantDeregistration(delay); + } else { + LOG.error("Failed to schedule deregister participants task for instances {}. " + + "Controller for cluster {} does not exist.", instancesToCheck, clusterName); } - ParticipantDeregistrationUtil.unscheduleDeregisterParticipants(manager.getClusterName(), newlyOnlineInstanceNames); } } diff --git a/helix-core/src/main/java/org/apache/helix/util/ParticipantDeregistrationUtil.java b/helix-core/src/main/java/org/apache/helix/util/ParticipantDeregistrationUtil.java deleted file mode 100644 index 4ebbf563f5..0000000000 --- a/helix-core/src/main/java/org/apache/helix/util/ParticipantDeregistrationUtil.java +++ /dev/null @@ -1,97 +0,0 @@ -package org.apache.helix.util; - -import java.util.Collections; -import java.util.HashSet; -import java.util.Set; -import org.apache.helix.HelixException; -import org.apache.helix.HelixManager; -import org.apache.helix.constants.InstanceConstants; -import org.apache.helix.controller.GenericHelixController; -import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider; -import org.apache.helix.model.InstanceConfig; -import org.apache.helix.model.LiveInstance; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - - -public class ParticipantDeregistrationUtil { - private static final Logger LOG = LoggerFactory.getLogger(ParticipantDeregistrationUtil.class); - - public static Set deregisterParticipants(HelixManager manager, ResourceControllerDataProvider cache, - Set instancesToCheck, boolean force) { - Set successfullyDeregisteredInstances = new HashSet<>(); - - if (manager == null || !manager.isConnected() || cache == null || instancesToCheck == null) { - LOG.info("ParticipantDeregistrationStage failed due to HelixManager being null or not connected!"); - return successfullyDeregisteredInstances; - } - - for (String instanceName : instancesToCheck) { - // check if instance is alive or does not have INSTANCEOPERATION = UNKNOWN - // if either true, then no-op - // if both false, then call admin API to drop the participant - InstanceConfig instanceConfig = cache.getInstanceConfigMap().get(instanceName); - LiveInstance liveInstance = cache.getLiveInstances().get(instanceName); - - if (instanceConfig == null ) { - LOG.debug("Instance config is null for instance {}, skip deregistering the instance", instanceName); - continue; - } - - if (!force && !InstanceConstants.InstanceOperation.DEREGISTER.equals( - instanceConfig.getInstanceOperation().getOperation())) { - LOG.debug("Instance {} does not have InstanceOperation of {}, skip deregistering the instance", - instanceName, InstanceConstants.InstanceOperation.DEREGISTER.name()); - continue; - } - - if (liveInstance != null && liveInstance.isValid()) { - LOG.debug("Instance {} is still alive, skip deregistering the instance", instanceName); - continue; - } - - LOG.info("Deregistering instance {} from cluster {}", instanceName, cache.getClusterName()); - try { - // gspencer TODO: remove print, just for testing - System.out.println("Removing instance " + instanceName); - unscheduleDeregisterParticipants(cache.getClusterName(), Collections.singleton(instanceName)); - manager.getClusterManagmentTool().dropInstance(cache.getClusterName(), instanceConfig); - successfullyDeregisteredInstances.add(instanceName); - } catch (HelixException e) { - LOG.error("Failed to deregister instance {} from cluster {}", instanceName, cache.getClusterName(), e); - } - } - - return successfullyDeregisteredInstances; - } - - public static void scheduleDeregisterParticipants(String clusterName, Set instancesToCheck, long delay) { - if (clusterName == null) { - LOG.error("Failed to schedule deregister participants. Cluster name is null."); - return; - } - GenericHelixController leaderController = - GenericHelixController.getLeaderController(clusterName); - if (leaderController != null) { - leaderController.scheduleDeregisterParticipantsTask(instancesToCheck, delay); - } else { - LOG.error("Failed to schedule deregister participants task for instances {}. " - + "Controller for cluster {} does not exist.", instancesToCheck, clusterName); - } - } - - public static void unscheduleDeregisterParticipants(String clusterName, Set instancesToCheck) { - if (clusterName == null) { - LOG.error("Failed to schedule deregister participants. Cluster name is null."); - return; - } - GenericHelixController leaderController = - GenericHelixController.getLeaderController(clusterName); - if (leaderController != null) { - leaderController.unscheduleDeregisterParticipantsTask(instancesToCheck); - } else { - LOG.error("Failed to unschedule deregister participants task for instances {}. " - + "Controller for cluster {} does not exist.", instancesToCheck, clusterName); - } - } -} diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/TestParticipantDeregistrationStage.java b/helix-core/src/test/java/org/apache/helix/controller/stages/TestParticipantDeregistrationStage.java index 49fc612d92..3253e8c420 100644 --- a/helix-core/src/test/java/org/apache/helix/controller/stages/TestParticipantDeregistrationStage.java +++ b/helix-core/src/test/java/org/apache/helix/controller/stages/TestParticipantDeregistrationStage.java @@ -1,7 +1,6 @@ package org.apache.helix.controller.stages; import java.util.ArrayList; -import java.util.Collections; import java.util.Date; import java.util.List; import org.apache.helix.ConfigAccessor; @@ -9,13 +8,12 @@ import org.apache.helix.HelixDataAccessor; import org.apache.helix.TestHelper; import org.apache.helix.common.ZkTestBase; -import org.apache.helix.constants.InstanceConstants; import org.apache.helix.integration.manager.ClusterControllerManager; import org.apache.helix.integration.manager.MockParticipantManager; import org.apache.helix.model.ClusterConfig; import org.apache.helix.model.LiveInstance; +import org.apache.helix.model.ParticipantHistory; import org.apache.helix.tools.ClusterVerifiers.BestPossibleExternalViewVerifier; -import org.apache.helix.util.ParticipantDeregistrationUtil; import org.testng.Assert; import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; @@ -31,6 +29,7 @@ public class TestParticipantDeregistrationStage extends ZkTestBase { private HelixAdmin _admin; private HelixDataAccessor _dataAccessor; private ClusterControllerManager _controller; + private ConfigAccessor _configAccessor; @BeforeClass public void beforeClass() { @@ -46,7 +45,7 @@ public void beforeClass() { .setZkAddr(ZK_ADDR).setWaitTillVerify(TestHelper.DEFAULT_REBALANCE_PROCESSING_WAIT_TIME) .build(); - setAutoDeregisterConfigs(CLUSTER_NAME, true, DEREGISTER_TIMEOUT); + _configAccessor = new ConfigAccessor(_gZkClient); // start controller String controllerName = CONTROLLER_PREFIX + "_0"; @@ -55,33 +54,12 @@ public void beforeClass() { _admin = _gSetupTool.getClusterManagementTool(); _dataAccessor = _controller.getHelixDataAccessor(); - } - - // Asserts that a node that is marked as DEREGISTER will immediately be removed from the cluster after it goes offline - @Test - public void testParticipantLeavesAfterOffline() throws Exception { - System.out.println("START " + TestHelper.getTestClassName() + "." + TestHelper.getTestMethodName() + " at " - + new Date(System.currentTimeMillis())); - MockParticipantManager participantToDeregister = _participants.get(0); - _gSetupTool.getClusterManagementTool().setInstanceOperation(CLUSTER_NAME, - participantToDeregister.getInstanceName(), InstanceConstants.InstanceOperation.DEREGISTER); - _bestPossibleClusterVerifier.verify(); - - participantToDeregister.syncStop(); - boolean result = TestHelper.verify(() -> !_admin.getInstancesInCluster(CLUSTER_NAME) - .contains(participantToDeregister.getInstanceName()), TestHelper.WAIT_DURATION); - Assert.assertTrue(result, "Participant should have been deregistered"); - - dropParticipant(CLUSTER_NAME, participantToDeregister); - addParticipant(CLUSTER_NAME, participantToDeregister.getInstanceName()); - System.out.println("END " + TestHelper.getTestClassName() + "." + TestHelper.getTestMethodName() + " at " - + new Date(System.currentTimeMillis())); + setAutoDeregisterConfigs(CLUSTER_NAME, true, DEREGISTER_TIMEOUT); } - // Asserts that a node that is not marked DEREGISTER will still be removed from the cluster after it exceeds - // the deregister timeout set in the cluster config - @Test (dependsOnMethods = "testParticipantLeavesAfterOffline") + // Asserts that a node will be removed from the cluster after it exceedsthe deregister timeout set in the cluster config + @Test public void testParticipantAutoLeavesAfterOfflineTimeout() throws Exception { System.out.println("START " + TestHelper.getTestClassName() + "." + TestHelper.getTestMethodName() + " at " + new Date(System.currentTimeMillis())); @@ -98,11 +76,11 @@ public void testParticipantAutoLeavesAfterOfflineTimeout() throws Exception { + new Date(System.currentTimeMillis())); } - // Asserts that a node that is not marked as DEREGISTER will not be removed from the cluster if it comes back online - // before the deregister timeout and that the deregister timeout is reset, so the node will not be removed until time + // Asserts that will not be removed from the cluster if it comes back online before the deregister timeout + // and that the deregister timeout is reset, so the node will not be removed until time // of last offline + deregister timeout @Test (dependsOnMethods = "testParticipantAutoLeavesAfterOfflineTimeout") - public void testParticipantUnschedulesDeregister() throws Exception { + public void testReconnectedParticipantNotDeregisteredWhenLive() throws Exception { System.out.println("START " + TestHelper.getTestClassName() + "." + TestHelper.getTestMethodName() + " at " + new Date(System.currentTimeMillis())); @@ -138,48 +116,44 @@ public void testParticipantUnschedulesDeregister() throws Exception { + new Date(System.currentTimeMillis())); } - @Test (dependsOnMethods = "testParticipantUnschedulesDeregister") + + // Same assertions as above but this time the node is re-killed immediately after being added back + @Test (dependsOnMethods = "testReconnectedParticipantNotDeregisteredWhenLive") public void testFlappingParticipantIsNotDeregistered() throws Exception { System.out.println("START " + TestHelper.getTestClassName() + "." + TestHelper.getTestMethodName() + " at " + new Date(System.currentTimeMillis())); - // Schedule a deregister MockParticipantManager participantToDeregister = _participants.get(0); - System.out.println(TestHelper.getTestMethodName() + " manually scheduling deregister for " - + participantToDeregister.getInstanceName()); - ParticipantDeregistrationUtil.scheduleDeregisterParticipants(CLUSTER_NAME, - Collections.singleton(participantToDeregister.getInstanceName()), DEREGISTER_TIMEOUT); + // Kill instance so deregister is scheduled + LiveInstance liveInstance = _dataAccessor.getProperty( + _dataAccessor.keyBuilder().liveInstance(participantToDeregister.getInstanceName())); + participantToDeregister.syncStop(); - // Sleep + // Sleep for more than half the deregister timeout Thread.sleep(DEREGISTER_TIMEOUT * 3/5); + // Manually recreate live instance so controller thinks it's back online, then immediately delete + _dataAccessor.setProperty(_dataAccessor.keyBuilder().liveInstance(participantToDeregister.getInstanceName()), + liveInstance); + ParticipantHistory participantHistory = _dataAccessor.getProperty(_dataAccessor.keyBuilder() + .participantHistory(participantToDeregister.getInstanceName())); + participantHistory.reportOnline("foo", "bar"); + _dataAccessor.setProperty(_dataAccessor.keyBuilder().participantHistory(participantToDeregister.getInstanceName()), + participantHistory); - // Drop the participant - //gspencer TODO: remove print debug statements - // Figure out what event is causing: - // Unscheduled deregister for [localhost_3, localhost_2, localhost_1, localhost_0, localhost_4] - System.out.println(TestHelper.getTestMethodName() + " dropping " + participantToDeregister.getInstanceName()); - dropParticipant(CLUSTER_NAME, participantToDeregister); - Assert.assertFalse(_admin.getInstancesInCluster(CLUSTER_NAME) - .contains(participantToDeregister.getInstanceName()), "Participant should have been dropped"); - - // Re add the participant and immediately kill it - System.out.println("Readding participant " + participantToDeregister.getInstanceName()); - MockParticipantManager rebornParticipant = addParticipant(CLUSTER_NAME, participantToDeregister.getInstanceName()); - rebornParticipant.syncStop(); - Assert.assertTrue(_admin.getInstancesInCluster(CLUSTER_NAME) - .contains(participantToDeregister.getInstanceName()), "Participant should have been re-added"); + _dataAccessor.removeProperty(_dataAccessor.keyBuilder().liveInstance(participantToDeregister.getInstanceName())); - // assert that the instance is still in the cluster + // assert that the instance is still in the cluster after original deregistration time should have passed long startTime = System.currentTimeMillis(); - while (System.currentTimeMillis() < startTime + DEREGISTER_TIMEOUT - 1000) { + while (System.currentTimeMillis() < startTime + (DEREGISTER_TIMEOUT * 3/5)) { Assert.assertTrue(_admin.getInstancesInCluster(CLUSTER_NAME) - .contains(participantToDeregister.getInstanceName()), "Participant should not have been deregistered" - + "within the renewed deregister timeout"); + .contains(participantToDeregister.getInstanceName()), "Participant should not have been deregistered"); } + // Re kill and assert that the instance is deregistered + _dataAccessor.removeProperty(_dataAccessor.keyBuilder().liveInstance(participantToDeregister.getInstanceName())); boolean result = TestHelper.verify(() -> !_admin.getInstancesInCluster(CLUSTER_NAME) - .contains(rebornParticipant.getInstanceName()), TestHelper.WAIT_DURATION); + .contains(participantToDeregister.getInstanceName()), TestHelper.WAIT_DURATION); Assert.assertTrue(result, "Participant should have been deregistered"); dropParticipant(CLUSTER_NAME, participantToDeregister); @@ -188,6 +162,54 @@ public void testFlappingParticipantIsNotDeregistered() throws Exception { + new Date(System.currentTimeMillis())); } + @Test (dependsOnMethods = "testFlappingParticipantIsNotDeregistered") + public void testClusterConfigChangeImmediatelyTriggersDeregistration() throws Exception { + System.out.println("START " + TestHelper.getTestClassName() + "." + TestHelper.getTestMethodName() + " at " + + new Date(System.currentTimeMillis())); + + // Set to deregister to disabled + long testDeregisterTimeout = 1000; + setAutoDeregisterConfigs(CLUSTER_NAME, false, testDeregisterTimeout); + + List killedParticipants = new ArrayList<>(); + for (int i = 0; i < 5; i++) { + MockParticipantManager participantToKill = addParticipant(CLUSTER_NAME, "participants_to_kill_" + i); + participantToKill.syncStop(); + killedParticipants.add(participantToKill); + } + + Thread.sleep(testDeregisterTimeout); + // Trigger on disable --> enable deregister + setAutoDeregisterConfigs(CLUSTER_NAME, true, testDeregisterTimeout); + + boolean result = TestHelper.verify(() -> { + List instances = _admin.getInstancesInCluster(CLUSTER_NAME); + return killedParticipants.stream().noneMatch(participant -> instances.contains(participant.getInstanceName())); + }, TestHelper.WAIT_DURATION); + Assert.assertTrue(result, "Participants should have been deregistered"); + + // Trigger on enable --> enable with shorter deregister + setAutoDeregisterConfigs(CLUSTER_NAME, true, 1000000); + + List replacementParticipants = new ArrayList<>(); + //reset dead participants + killedParticipants.forEach(participant -> { + dropParticipant(CLUSTER_NAME, participant); + MockParticipantManager replacement = addParticipant(CLUSTER_NAME, participant.getInstanceName()); + replacement.syncStop(); + replacementParticipants.add(replacement); + }); + + Thread.sleep(testDeregisterTimeout*5); + setAutoDeregisterConfigs(CLUSTER_NAME, true, testDeregisterTimeout); + + result = TestHelper.verify(() -> { + List instances = _admin.getInstancesInCluster(CLUSTER_NAME); + return replacementParticipants.stream().noneMatch(participant -> instances.contains(participant.getInstanceName())); + }, TestHelper.WAIT_DURATION); + Assert.assertTrue(result, "Participants should have been deregistered"); + } + @Override public void dropParticipant(String clusterName, MockParticipantManager participant) { _participants.remove(participant); @@ -202,10 +224,9 @@ public MockParticipantManager addParticipant(String clusterName, String instance } private void setAutoDeregisterConfigs(String clusterName, boolean enabled, long timeout) { - ConfigAccessor configAccessor = new ConfigAccessor(_gZkClient); - ClusterConfig clusterConfig = configAccessor.getClusterConfig(clusterName); + ClusterConfig clusterConfig = _configAccessor.getClusterConfig(clusterName); clusterConfig.setParticipantDeregistrationEnabled(enabled); clusterConfig.setParticipantDeregistrationTimeout(timeout); - configAccessor.setClusterConfig(clusterName, clusterConfig); + _configAccessor.setClusterConfig(clusterName, clusterConfig); } } diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestAddResourceWhenRequireDelayedRebalanceOverwrite.java b/helix-core/src/test/java/org/apache/helix/integration/TestAddResourceWhenRequireDelayedRebalanceOverwrite.java index 7c93c33b69..c2b57f3904 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/TestAddResourceWhenRequireDelayedRebalanceOverwrite.java +++ b/helix-core/src/test/java/org/apache/helix/integration/TestAddResourceWhenRequireDelayedRebalanceOverwrite.java @@ -110,12 +110,10 @@ public void testAddResourceWhenInstancesDisabledWithinWindow() { + new Date(System.currentTimeMillis())); } - private MockParticipantManager addParticipant(String cluster, String instanceName) { - _gSetupTool.addInstanceToCluster(cluster, instanceName); - MockParticipantManager toAddParticipant = - new MockParticipantManager(ZK_ADDR, cluster, instanceName); + @Override + public MockParticipantManager addParticipant(String clusterName, String instanceName) { + MockParticipantManager toAddParticipant = super.addParticipant(clusterName, instanceName); _participants.add(toAddParticipant); - toAddParticipant.syncStart(); return toAddParticipant; } diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestForceKillInstance.java b/helix-core/src/test/java/org/apache/helix/integration/TestForceKillInstance.java index d7dc442054..c5946a331f 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/TestForceKillInstance.java +++ b/helix-core/src/test/java/org/apache/helix/integration/TestForceKillInstance.java @@ -132,7 +132,7 @@ public void testForceKillDropsAssignment() { "Instance should not have any assignments"); // Reset state of cluster - dropParticipant(CLUSTER_NAME, instanceToKillName); + dropParticipant(CLUSTER_NAME, instanceToKill); addParticipant(CLUSTER_NAME, instanceToKillName); System.out.println("END " + TestHelper.getTestClassName() + "." + TestHelper.getTestMethodName() + " at " + new Date(System.currentTimeMillis())); @@ -166,7 +166,7 @@ public void testSessionExpiration() throws Exception { "Instance should not have any assignments"); // Reset state of cluster - dropParticipant(CLUSTER_NAME, instanceToKillName); + dropParticipant(CLUSTER_NAME, instanceToKill); addParticipant(CLUSTER_NAME, instanceToKillName); System.out.println("END " + TestHelper.getTestClassName() + "." + TestHelper.getTestMethodName() + " at " + new Date(System.currentTimeMillis())); @@ -203,7 +203,7 @@ public void testDisconnectReconnect() { "Instance should not have any assignments"); // Reset state of cluster - dropParticipant(CLUSTER_NAME, instanceToKillName); + dropParticipant(CLUSTER_NAME, instanceToKill); addParticipant(CLUSTER_NAME, instanceToKillName); System.out.println("END " + TestHelper.getTestClassName() + "." + TestHelper.getTestMethodName() + " at " + new Date(System.currentTimeMillis())); @@ -236,7 +236,7 @@ public void testRemoveUnknownOperationAfterForceKill() { "Instance should not have any assignments"); // Reset state of cluster - dropParticipant(CLUSTER_NAME, instanceToKillName); + dropParticipant(CLUSTER_NAME, instanceToKill); addParticipant(CLUSTER_NAME, instanceToKillName); System.out.println("END " + TestHelper.getTestClassName() + "." + TestHelper.getTestMethodName() + " at " + new Date(System.currentTimeMillis())); @@ -271,7 +271,7 @@ public void testSessionExpirationWithoutUnknownOperation() throws Exception { "Instance should have assignments"); // Reset state of cluster - dropParticipant(CLUSTER_NAME, instanceToKillName); + dropParticipant(CLUSTER_NAME, instanceToKill); addParticipant(CLUSTER_NAME, instanceToKillName); System.out.println("END " + TestHelper.getTestClassName() + "." + TestHelper.getTestMethodName() + " at " + new Date(System.currentTimeMillis())); @@ -309,7 +309,7 @@ public void testDisconnectReconnectWithoutUnknownOperation() { "Instance should not have any assignments"); // Reset state of cluster - dropParticipant(CLUSTER_NAME, instanceToKillName); + dropParticipant(CLUSTER_NAME, instanceToKill); addParticipant(CLUSTER_NAME, instanceToKillName); System.out.println("END " + TestHelper.getTestClassName() + "." + TestHelper.getTestMethodName() + " at " + new Date(System.currentTimeMillis())); @@ -341,7 +341,7 @@ public void testLiveInstanceZNodeImmediatelyRecreated() { _dataAccessor.removeProperty(_dataAccessor.keyBuilder().liveInstance(instanceToKillName)); // Reset state of cluster - dropParticipant(CLUSTER_NAME, instanceToKillName); + dropParticipant(CLUSTER_NAME, instanceToKill); addParticipant(CLUSTER_NAME, instanceToKillName); System.out.println("END " + TestHelper.getTestClassName() + "." + TestHelper.getTestMethodName() + " at " + new Date(System.currentTimeMillis())); @@ -375,7 +375,7 @@ public void testDownwardStateTransitionsBlocked() throws Exception { "Instance should not have any assignments"); // Reset state of cluster - dropParticipant(CLUSTER_NAME, instanceToKillName); + dropParticipant(CLUSTER_NAME, instanceToKill); addParticipant(CLUSTER_NAME, instanceToKillName); System.out.println("END " + TestHelper.getTestClassName() + "." + TestHelper.getTestMethodName() + " at " + new Date(System.currentTimeMillis())); @@ -411,13 +411,14 @@ public void testForceKillWithBlockedDownwardStateTransition() throws Exception { "Instance should not have any assignments"); // Reset state of cluster - dropParticipant(CLUSTER_NAME, instanceToKillName); + dropParticipant(CLUSTER_NAME, instanceToKill); addParticipant(CLUSTER_NAME, instanceToKillName); System.out.println("END " + TestHelper.getTestClassName() + "." + TestHelper.getTestMethodName() + " at " + new Date(System.currentTimeMillis())); } - private MockParticipantManager addParticipant(String cluster, String instanceName) { + @Override + public MockParticipantManager addParticipant(String cluster, String instanceName) { _gSetupTool.addInstanceToCluster(cluster, instanceName); MockParticipantManager toAddParticipant = new MockParticipantManager(ZK_ADDR, cluster, instanceName); @@ -429,19 +430,10 @@ private MockParticipantManager addParticipant(String cluster, String instanceNam return toAddParticipant; } - protected void dropParticipant(String cluster, String instanceName) { - // find mock participant manager with instanceName and remove it from _mockParticipantManagers. - MockParticipantManager toRemoveManager = _participants.stream() - .filter(manager -> manager.getInstanceName().equals(instanceName)) - .findFirst() - .orElse(null); - if (toRemoveManager != null) { - toRemoveManager.syncStop(); - _participants.remove(toRemoveManager); - } - - InstanceConfig instanceConfig = _gSetupTool.getClusterManagementTool().getInstanceConfig(cluster, instanceName); - _gSetupTool.getClusterManagementTool().dropInstance(cluster, instanceConfig); + @Override + public void dropParticipant(String cluster, MockParticipantManager participant) { + _participants.remove(participant); + super.dropParticipant(cluster, participant); } private Map getEVs() {