Skip to content

Commit

Permalink
refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
GrantPSpencer committed Sep 27, 2024
1 parent eb5e4f6 commit 626e45a
Show file tree
Hide file tree
Showing 8 changed files with 250 additions and 289 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,6 @@
import org.apache.helix.model.ResourceConfig;
import org.apache.helix.monitoring.mbeans.ClusterEventMonitor;
import org.apache.helix.monitoring.mbeans.ClusterStatusMonitor;
import org.apache.helix.util.ParticipantDeregistrationUtil;
import org.apache.helix.zookeeper.zkclient.exception.ZkInterruptedException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -548,7 +547,10 @@ private static PipelineRegistry createDefaultRegistry(String pipelineName) {
// backward compatibility check
Pipeline liveInstancePipeline = new Pipeline(pipelineName);
liveInstancePipeline.addStage(new CompatibilityCheckStage());
liveInstancePipeline.addStage(new ParticipantDeregistrationStage());

// Participant Deregistration Pipeline
Pipeline participantDeregistrationPipeline = new Pipeline(pipelineName);
participantDeregistrationPipeline.addStage(new ParticipantDeregistrationStage());

// auto-exit maintenance mode if applicable
Pipeline autoExitMaintenancePipeline = new Pipeline(pipelineName);
Expand All @@ -564,11 +566,11 @@ private static PipelineRegistry createDefaultRegistry(String pipelineName) {
rebalancePipeline);
registry
.register(ClusterEventType.ClusterConfigChange, dataRefresh, autoExitMaintenancePipeline,
dataPreprocess, rebalancePipeline);
dataPreprocess, rebalancePipeline, participantDeregistrationPipeline);
registry
.register(ClusterEventType.LiveInstanceChange, dataRefresh, autoExitMaintenancePipeline,
liveInstancePipeline, dataPreprocess, externalViewPipeline, customizedViewPipeline,
rebalancePipeline);
rebalancePipeline, participantDeregistrationPipeline);
registry
.register(ClusterEventType.MessageChange, dataRefresh, dataPreprocess, rebalancePipeline);
registry.register(ClusterEventType.Resume, dataRefresh, dataPreprocess, externalViewPipeline,
Expand All @@ -588,6 +590,8 @@ private static PipelineRegistry createDefaultRegistry(String pipelineName) {
customizedViewPipeline, rebalancePipeline);
registry.register(ClusterEventType.CustomizeStateConfigChange, dataRefresh, dataPreprocess,
customizedViewPipeline, rebalancePipeline);
// gspencer todo: limit to just refresh instance and instance configs??
registry.register(ClusterEventType.ParticipantDeregistration, dataRefresh, participantDeregistrationPipeline);
return registry;
}
}
Expand Down Expand Up @@ -615,6 +619,10 @@ private static PipelineRegistry createTaskRegistry(String pipelineName) {
rebalancePipeline.addStage(new MessageGenerationPhase());
rebalancePipeline.addStage(new TaskMessageDispatchStage());

// Participant Deregistration Pipeline
Pipeline participantDeregistrationPipeline = new Pipeline(pipelineName);
participantDeregistrationPipeline.addStage(new ParticipantDeregistrationStage());

// backward compatibility check
Pipeline liveInstancePipeline = new Pipeline(pipelineName);
liveInstancePipeline.addStage(new CompatibilityCheckStage());
Expand Down Expand Up @@ -708,8 +716,8 @@ private GenericHelixController(PipelineRegistry registry, PipelineRegistry taskR

_onDemandRebalanceTimer =
new Timer("GenericHelixController_" + _clusterName + "_onDemand_Timer", true);
_deregisterParticipantsTimer
= new Timer("GenericHelixController_" + _clusterName + "_deregisterParticipants_Timer", true);
_participantDeregistrationTimer
= new Timer("GenericHelixController_" + _clusterName + "_participantDeregistration_Timer", true);

// TODO: refactor to simplify below similar code of the 3 pipelines
// initialize pipelines at the end so we have everything else prepared
Expand Down Expand Up @@ -1123,28 +1131,6 @@ public void onLiveInstanceChange(List<LiveInstance> liveInstances,
if (liveInstances == null) {
liveInstances = Collections.emptyList();
}

Set<String> newlyOfflineInstanceNames = new HashSet<>();

synchronized (_lastSeenInstances) {
if (_lastSeenInstances.get() != null) {
// Record the last seen instances
newlyOfflineInstanceNames = _lastSeenInstances.get().values().stream()
.map(LiveInstance::getInstanceName)
.collect(Collectors.toSet());
}
}

// Find newly offline and newly online instances
Set<String> currentLiveInstanceNames = liveInstances.stream()
.map(LiveInstance::getInstanceName)
.collect(Collectors.toSet());
Set<String> newlyOnlineInstanceNames = new HashSet<>(currentLiveInstanceNames);
newlyOnlineInstanceNames.removeAll(newlyOfflineInstanceNames);
newlyOfflineInstanceNames.removeAll(currentLiveInstanceNames);


// Go through the live instance list and make sure that we are observing them
// accordingly. The action is done regardless of the paused flag.
if (changeContext.getType() == NotificationContext.Type.INIT
|| changeContext.getType() == NotificationContext.Type.CALLBACK) {
Expand All @@ -1156,11 +1142,9 @@ public void onLiveInstanceChange(List<LiveInstance> liveInstances,
liveInstances = Collections.emptyList();
checkLiveInstancesObservation(liveInstances, changeContext);
}
Map<String, Object> eventAttributes = new HashMap<>();
eventAttributes.put(AttributeName.eventData.name(), liveInstances);
eventAttributes.put(AttributeName.NEWLY_OFFLINE_INSTANCE_NAMES.name(), newlyOfflineInstanceNames);
eventAttributes.put(AttributeName.NEWLY_ONLINE_INSTANCE_NAMES.name(), newlyOnlineInstanceNames);
pushToEventQueues(ClusterEventType.LiveInstanceChange, changeContext, eventAttributes);

pushToEventQueues(ClusterEventType.LiveInstanceChange, changeContext,
Collections.<String, Object>singletonMap(AttributeName.eventData.name(), liveInstances));

logger.info(
"END: Generic GenericClusterController.onLiveInstanceChange() for cluster " + _clusterName);
Expand Down Expand Up @@ -1664,75 +1648,52 @@ synchronized void closeRebalancer() {
}
}

Map<String, AtomicReference<DeregisterParticipantsTask>> _deregisterParticipantsTasks = new HashMap<>();
Timer _deregisterParticipantsTimer = null;
class DeregisterParticipantsTask extends TimerTask {

Timer _participantDeregistrationTimer = null;
AtomicReference<ParticipantDeregistrationTask> _nextParticipantDeregistrationTask = new AtomicReference<>();
class ParticipantDeregistrationTask extends TimerTask {
private final HelixManager _manager;
private final ResourceControllerDataProvider _cache;
private final Set<String> _participantsToDeregister;
private final long _nextDeregistrationTime;

public DeregisterParticipantsTask(HelixManager manager, ResourceControllerDataProvider cache,
Set<String> participantsToDeregister) {
public ParticipantDeregistrationTask(HelixManager manager, long delay) {
_manager = manager;
_cache = cache;
_participantsToDeregister = participantsToDeregister;
_nextDeregistrationTime = System.currentTimeMillis() + delay;
}

public long getNextDeregistrationTime() {
return _nextDeregistrationTime;
}

@Override
public void run() {
if (_participantsToDeregister.isEmpty()) {
return;
}

_cache.refresh(_manager.getHelixDataAccessor());
ParticipantDeregistrationUtil.deregisterParticipants(_manager, _cache, _participantsToDeregister, true);
NotificationContext changeContext = new NotificationContext(_manager);
changeContext.setType(NotificationContext.Type.CALLBACK);
pushToEventQueues(ClusterEventType.ParticipantDeregistration, changeContext, Collections.EMPTY_MAP);
}
}

public void scheduleDeregisterParticipantsTask(Set<String> participantsToDeregister, long delay) {
if (participantsToDeregister == null || participantsToDeregister.isEmpty() || delay < 0) {
return;
}

//gspencer TODO: remove print debug statements
System.out.println("Tasks scheduled before adding: " + _deregisterParticipantsTasks);

DeregisterParticipantsTask newTask = new DeregisterParticipantsTask(_helixManager, _resourceControlDataProvider,
participantsToDeregister);
for (String participant : participantsToDeregister) {
AtomicReference<DeregisterParticipantsTask> taskRef =
_deregisterParticipantsTasks.computeIfAbsent(participant, k -> new AtomicReference<>());
DeregisterParticipantsTask prevTask = taskRef.get();
if (prevTask != null) {
prevTask.cancel();
public void scheduleParticipantDeregistration(long delay) {
logger.info("Scheduled deregister participants task for cluster {}.", _helixManager.getClusterName());
long currentTime = System.currentTimeMillis();
long deregistrationTime = currentTime + delay;
if (delay > 0) {
ParticipantDeregistrationTask preTask = _nextParticipantDeregistrationTask.get();
if (preTask != null && preTask.getNextDeregistrationTime() > currentTime
&& preTask.getNextDeregistrationTime() < deregistrationTime) {
// already have an earlier participant deregistration scheduled, no need to schedule again.
return;
}
taskRef.set(newTask);
}
_deregisterParticipantsTimer.schedule(newTask, delay);
//gspencer TODO: remove print statement for debugging
System.out.println("Scheduled deregister for " + participantsToDeregister);
System.out.println("Currently scheduled tasks: " + _deregisterParticipantsTasks);
logger.info("Scheduled deregister participants task for cluster {} on instances: {}.",
_helixManager.getClusterName(), participantsToDeregister);
}

public void unscheduleDeregisterParticipantsTask(Set<String> participantsToUnschedule) {
if (participantsToUnschedule == null || participantsToUnschedule.isEmpty()) {
return;
}
System.out.println("Tasks scheduled before removing: " + _deregisterParticipantsTasks);

for (String participant : participantsToUnschedule) {
if (_deregisterParticipantsTasks.containsKey(participant)) {
AtomicReference<DeregisterParticipantsTask> taskRef = _deregisterParticipantsTasks.get(participant);
DeregisterParticipantsTask task = taskRef.get();
if (task != null) {
task.cancel();
}
_deregisterParticipantsTasks.remove(participant);
}
ParticipantDeregistrationTask newTask = new ParticipantDeregistrationTask(_helixManager, delay);
_participantDeregistrationTimer.schedule(newTask, delay);
logger.info("Scheduled deregister participants task for cluster {} to occur at time {}.",
_helixManager.getClusterName(), deregistrationTime);

ParticipantDeregistrationTask preTask = _nextParticipantDeregistrationTask.getAndSet(newTask);
if (preTask != null) {
preTask.cancel();
}
System.out.println("Unscheduled deregister for " + participantsToUnschedule);
System.out.println("Currently scheduled tasks: " + _deregisterParticipantsTasks);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1051,6 +1051,7 @@ private void updateOfflineInstanceHistory(HelixDataAccessor accessor) {
if (!_updateInstanceOfflineTime) {
return;
}

List<String> offlineNodes = new ArrayList<>(_allInstanceConfigCache.getPropertyMap().keySet());
offlineNodes.removeAll(_allLiveInstanceCache.getPropertyMap().keySet());
_instanceOfflineTimeMap = new HashMap<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,5 +40,6 @@ public enum ClusterEventType {
ControllerChange,
RetryRebalance,
StateVerifier,
ParticipantDeregistration,
Unknown
}
Original file line number Diff line number Diff line change
@@ -1,13 +1,18 @@
package org.apache.helix.controller.stages;

import java.util.List;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import org.apache.helix.HelixException;
import org.apache.helix.HelixManager;
import org.apache.helix.controller.GenericHelixController;
import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
import org.apache.helix.controller.pipeline.AbstractAsyncBaseStage;
import org.apache.helix.controller.pipeline.AsyncWorkerType;
import org.apache.helix.model.ClusterConfig;
import org.apache.helix.util.ParticipantDeregistrationUtil;
import org.apache.helix.model.InstanceConfig;
import org.apache.helix.model.LiveInstance;
import org.apache.helix.model.ParticipantHistory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -22,7 +27,6 @@ public AsyncWorkerType getAsyncWorkerType() {

@Override
public void execute(ClusterEvent event) throws Exception {
_eventId = event.getEventId();
HelixManager manager = event.getAttribute(AttributeName.helixmanager.name());
ClusterConfig clusterConfig = manager.getConfigAccessor().getClusterConfig(manager.getClusterName());
if (clusterConfig == null || !clusterConfig.isParticipantDeregistrationEnabled()) {
Expand All @@ -31,20 +35,100 @@ public void execute(ClusterEvent event) throws Exception {
return;
}


// scan all offlineTimeMap:
// Create list of nodes to deregister - can we batch deregister these?
// detect the smallest difference between now() and offlineTime + delay
// probably need to use same now() time for all nodes so there is not condition where we miss a node because time elapsed between deregister check and queue check
// schedule participantDeregistration for that earliest time


ResourceControllerDataProvider cache = event.getAttribute(AttributeName.ControllerDataProvider.name());
Set<String> newlyOfflineInstanceNames = event.getAttribute(AttributeName.NEWLY_OFFLINE_INSTANCE_NAMES.name());
Set<String> newlyOnlineInstanceNames = event.getAttribute(AttributeName.NEWLY_ONLINE_INSTANCE_NAMES.name());
// gpsencer TODO: bypass any nodes not in cluster? Or should we let the deregister logic no op on them
List<String> instancesInCluster = manager.getClusterManagmentTool().getInstancesInCluster(manager.getClusterName());
newlyOfflineInstanceNames.retainAll(instancesInCluster);

Set<String> deregisteredParticipants = ParticipantDeregistrationUtil.deregisterParticipants(
manager, cache, newlyOfflineInstanceNames, false);
newlyOfflineInstanceNames.removeAll(deregisteredParticipants);
if (!newlyOfflineInstanceNames.isEmpty()) {
ParticipantDeregistrationUtil.scheduleDeregisterParticipants(manager.getClusterName(),
newlyOfflineInstanceNames, clusterConfig.getParticipantDeregistrationTimeout());
Map<String, Long> offlineTimeMap = cache.getInstanceOfflineTimeMap();
long deregisterDelay = clusterConfig.getParticipantDeregistrationTimeout();
long stageStartTime = System.currentTimeMillis();
Set<String> participantsToDeregister = new HashSet<>();
Long earliestDeregisterTime = Long.MAX_VALUE;


for (Map.Entry<String, Long> entry : offlineTimeMap.entrySet()) {
String instanceName = entry.getKey();
Long offlineTime = entry.getValue();
long deregisterTime = offlineTime + deregisterDelay;

// Skip if instance is still online
if (offlineTime == ParticipantHistory.ONLINE) {
continue;
}

if (deregisterTime <= stageStartTime) {
participantsToDeregister.add(instanceName);
} else {
if (deregisterTime < earliestDeregisterTime) {
earliestDeregisterTime = deregisterTime;
}
}
}

Set<String> deregisteredParticipants = deregisterParticipants(manager, cache, participantsToDeregister);
System.out.println("successfully deregistered: " + deregisteredParticipants);
if (earliestDeregisterTime != Long.MAX_VALUE) {
long delay = earliestDeregisterTime - stageStartTime;
scheduleParticipantDeregistration(manager.getClusterName(), participantsToDeregister, delay);
}
}

private Set<String> deregisterParticipants(HelixManager manager, ResourceControllerDataProvider cache,
Set<String> instancesToDeregister) {
Set<String> successfullyDeregisteredInstances = new HashSet<>();

if (manager == null || !manager.isConnected() || cache == null || instancesToDeregister == null) {
LOG.info("ParticipantDeregistrationStage failed due to HelixManager being null or not connected!");
return successfullyDeregisteredInstances;
}

for (String instanceName : instancesToDeregister) {
// check if instance is alive or does not have INSTANCEOPERATION = UNKNOWN
// if either true, then no-op
// if both false, then call admin API to drop the participant
InstanceConfig instanceConfig = cache.getInstanceConfigMap().get(instanceName);
LiveInstance liveInstance = cache.getLiveInstances().get(instanceName);

if (instanceConfig == null ) {
LOG.debug("Instance config is null for instance {}, skip deregistering the instance", instanceName);
continue;
}

if (liveInstance != null && liveInstance.isValid()) {
LOG.debug("Instance {} is still alive, skip deregistering the instance", instanceName);
continue;
}

LOG.info("Deregistering instance {} from cluster {}", instanceName, cache.getClusterName());
try {
manager.getClusterManagmentTool().dropInstance(cache.getClusterName(), instanceConfig);
successfullyDeregisteredInstances.add(instanceName);
} catch (HelixException e) {
LOG.error("Failed to deregister instance {} from cluster {}", instanceName, cache.getClusterName(), e);
}
}

return successfullyDeregisteredInstances;
}

private void scheduleParticipantDeregistration(String clusterName, Set<String> instancesToCheck, long delay) {
if (clusterName == null) {
LOG.error("Failed to schedule deregister participants. Cluster name is null.");
return;
}
GenericHelixController leaderController =
GenericHelixController.getLeaderController(clusterName);
if (leaderController != null) {
leaderController.scheduleParticipantDeregistration(delay);
} else {
LOG.error("Failed to schedule deregister participants task for instances {}. "
+ "Controller for cluster {} does not exist.", instancesToCheck, clusterName);
}
ParticipantDeregistrationUtil.unscheduleDeregisterParticipants(manager.getClusterName(), newlyOnlineInstanceNames);
}
}
Loading

0 comments on commit 626e45a

Please sign in to comment.