Skip to content

Commit

Permalink
Enhance TableRebalancer to support no-downtime rebalance for strict r…
Browse files Browse the repository at this point in the history
…eplica-group routing tables (#6212)

On top of #6208, this PR enhances the TableRebalancer to support the no-downtime rebalance for strict replica-group routing, which still hold the minimum available replicas requirement.
  • Loading branch information
Jackie-Jiang authored Nov 2, 2020
1 parent aa883b8 commit e0f15aa
Show file tree
Hide file tree
Showing 2 changed files with 404 additions and 74 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.concurrent.TimeoutException;
import org.I0Itec.zkclient.exception.ZkBadVersionException;
import org.apache.commons.configuration.Configuration;
Expand All @@ -46,6 +48,7 @@
import org.apache.pinot.controller.helix.core.assignment.segment.SegmentAssignment;
import org.apache.pinot.controller.helix.core.assignment.segment.SegmentAssignmentFactory;
import org.apache.pinot.controller.helix.core.assignment.segment.SegmentAssignmentUtils;
import org.apache.pinot.spi.config.table.RoutingConfig;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.config.table.assignment.InstancePartitionsType;
Expand Down Expand Up @@ -127,12 +130,15 @@ public RebalanceResult rebalance(TableConfig tableConfig, Configuration rebalanc
int minReplicasToKeepUpForNoDowntime = rebalanceConfig
.getInt(RebalanceConfigConstants.MIN_REPLICAS_TO_KEEP_UP_FOR_NO_DOWNTIME,
RebalanceConfigConstants.DEFAULT_MIN_REPLICAS_TO_KEEP_UP_FOR_NO_DOWNTIME);
boolean enableStrictReplicaGroup =
tableConfig.getRoutingConfig() != null && RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE
.equalsIgnoreCase(tableConfig.getRoutingConfig().getInstanceSelectorType());
boolean bestEfforts = rebalanceConfig
.getBoolean(RebalanceConfigConstants.BEST_EFFORTS, RebalanceConfigConstants.DEFAULT_BEST_EFFORTS);
LOGGER.info(
"Start rebalancing table: {} with dryRun: {}, reassignInstances: {}, includeConsuming: {}, bootstrap: {}, downtime: {}, minReplicasToKeepUpForNoDowntime: {}, bestEfforts: {}",
"Start rebalancing table: {} with dryRun: {}, reassignInstances: {}, includeConsuming: {}, bootstrap: {}, downtime: {}, minReplicasToKeepUpForNoDowntime: {}, enableStrictReplicaGroup: {}, bestEfforts: {}",
tableNameWithType, dryRun, reassignInstances, includeConsuming, bootstrap, downtime,
minReplicasToKeepUpForNoDowntime, bestEfforts);
minReplicasToKeepUpForNoDowntime, enableStrictReplicaGroup, bestEfforts);

// Validate table config
try {
Expand Down Expand Up @@ -334,8 +340,8 @@ public RebalanceResult rebalance(TableConfig tableConfig, Configuration rebalanc
minAvailableReplicas = Math.max(numReplicas + minReplicasToKeepUpForNoDowntime, 0);
}

LOGGER.info("Rebalancing table: {} with minAvailableReplicas: {}, bestEfforts: {}", tableNameWithType,
minAvailableReplicas, bestEfforts);
LOGGER.info("Rebalancing table: {} with minAvailableReplicas: {}, enableStrictReplicaGroup: {}, bestEfforts: {}",
tableNameWithType, minAvailableReplicas, enableStrictReplicaGroup, bestEfforts);
int expectedVersion = currentIdealState.getRecord().getVersion();
while (true) {
// Wait for ExternalView to converge before updating the next IdealState
Expand Down Expand Up @@ -373,16 +379,18 @@ public RebalanceResult rebalance(TableConfig tableConfig, Configuration rebalanc
}

if (currentAssignment.equals(targetAssignment)) {
LOGGER.info("Finished rebalancing table: {} with minAvailableReplicas: {}, bestEfforts: {} in {}ms.",
tableNameWithType, minAvailableReplicas, bestEfforts, System.currentTimeMillis() - startTimeMs);
LOGGER.info(
"Finished rebalancing table: {} with minAvailableReplicas: {}, enableStrictReplicaGroup: {}, bestEfforts: {} in {}ms.",
tableNameWithType, minAvailableReplicas, enableStrictReplicaGroup, bestEfforts,
System.currentTimeMillis() - startTimeMs);
return new RebalanceResult(RebalanceResult.Status.DONE,
"Success with minAvailableReplicas: " + minAvailableReplicas
+ " (both IdealState and ExternalView should reach the target segment assignment)",
instancePartitionsMap, targetAssignment);
}

Map<String, Map<String, String>> nextAssignment =
getNextAssignment(currentAssignment, targetAssignment, minAvailableReplicas);
getNextAssignment(currentAssignment, targetAssignment, minAvailableReplicas, enableStrictReplicaGroup);
LOGGER.info("Got the next assignment for table: {} with number of segments to be moved to each instance: {}",
tableNameWithType,
SegmentAssignmentUtils.getNumSegmentsToBeMovedPerInstance(currentAssignment, nextAssignment));
Expand Down Expand Up @@ -546,22 +554,76 @@ static boolean isExternalViewConverged(String tableNameWithType,
return true;
}

private static Map<String, Map<String, String>> getNextAssignment(Map<String, Map<String, String>> currentAssignment,
Map<String, Map<String, String>> targetAssignment, int minAvailableReplicas) {
/**
* Returns the next assignment for the table based on the current assignment and the target assignment with regards to
* the minimum available replicas requirement. For strict replica-group mode, track the available instances for all
* the segments with the same instances in the next assignment, and ensure the minimum available replicas requirement
* is met. If adding the assignment for a segment breaks the requirement, use the current assignment for the segment.
*/
@VisibleForTesting
static Map<String, Map<String, String>> getNextAssignment(Map<String, Map<String, String>> currentAssignment,
Map<String, Map<String, String>> targetAssignment, int minAvailableReplicas, boolean enableStrictReplicaGroup) {
return enableStrictReplicaGroup ? getNextStrictReplicaGroupAssignment(currentAssignment, targetAssignment,
minAvailableReplicas)
: getNextNonStrictReplicaGroupAssignment(currentAssignment, targetAssignment, minAvailableReplicas);
}

private static Map<String, Map<String, String>> getNextStrictReplicaGroupAssignment(
Map<String, Map<String, String>> currentAssignment, Map<String, Map<String, String>> targetAssignment,
int minAvailableReplicas) {
Map<String, Map<String, String>> nextAssignment = new TreeMap<>();
Map<Set<String>, Set<String>> availableInstancesMap = new HashMap<>();
for (Map.Entry<String, Map<String, String>> entry : currentAssignment.entrySet()) {
String segmentName = entry.getKey();
Map<String, String> currentInstanceStateMap = entry.getValue();
Map<String, String> targetInstanceStateMap = targetAssignment.get(segmentName);
SingleSegmentAssignment assignment =
getNextSingleSegmentAssignment(currentInstanceStateMap, targetInstanceStateMap, minAvailableReplicas);
Set<String> assignedInstances = assignment._instanceStateMap.keySet();
Set<String> availableInstances = assignment._availableInstances;
availableInstancesMap.compute(assignedInstances, (k, currentAvailableInstances) -> {
if (currentAvailableInstances == null) {
// First segment assigned to these instances, use the new assignment and update the available instances
nextAssignment.put(segmentName, assignment._instanceStateMap);
return availableInstances;
} else {
// There are other segments assigned to the same instances, check the available instances to see if adding the
// new assignment can still hold the minimum available replicas requirement
availableInstances.retainAll(currentAvailableInstances);
if (availableInstances.size() >= minAvailableReplicas) {
// New assignment can be added
nextAssignment.put(segmentName, assignment._instanceStateMap);
return availableInstances;
} else {
// New assignment cannot be added, use the current instance state map
nextAssignment.put(segmentName, currentInstanceStateMap);
return currentAvailableInstances;
}
}
});
}
return nextAssignment;
}

private static Map<String, Map<String, String>> getNextNonStrictReplicaGroupAssignment(
Map<String, Map<String, String>> currentAssignment, Map<String, Map<String, String>> targetAssignment,
int minAvailableReplicas) {
Map<String, Map<String, String>> nextAssignment = new TreeMap<>();
for (Map.Entry<String, Map<String, String>> entry : currentAssignment.entrySet()) {
String segmentName = entry.getKey();
nextAssignment.put(segmentName,
getNextInstanceStateMap(entry.getValue(), targetAssignment.get(segmentName), minAvailableReplicas));
getNextSingleSegmentAssignment(entry.getValue(), targetAssignment.get(segmentName),
minAvailableReplicas)._instanceStateMap);
}

return nextAssignment;
}

/**
* Returns the next assignment for a segment based on the current instance state map and the target instance state map
* with regards to the minimum available replicas requirement.
*/
@VisibleForTesting
@SuppressWarnings("Duplicates")
static Map<String, String> getNextInstanceStateMap(Map<String, String> currentInstanceStateMap,
static SingleSegmentAssignment getNextSingleSegmentAssignment(Map<String, String> currentInstanceStateMap,
Map<String, String> targetInstanceStateMap, int minAvailableReplicas) {
Map<String, String> nextInstanceStateMap = new TreeMap<>();

Expand All @@ -586,6 +648,7 @@ static Map<String, String> getNextInstanceStateMap(Map<String, String> currentIn
}
}
}
Set<String> availableInstances = new TreeSet<>(nextInstanceStateMap.keySet());

// Add target instances until the number of instances matched
int instancesToAdd = targetInstanceStateMap.size() - nextInstanceStateMap.size();
Expand All @@ -601,6 +664,21 @@ static Map<String, String> getNextInstanceStateMap(Map<String, String> currentIn
}
}

return nextInstanceStateMap;
return new SingleSegmentAssignment(nextInstanceStateMap, availableInstances);
}

/**
* Assignment result for a single segment.
*/
@VisibleForTesting
static class SingleSegmentAssignment {
final Map<String, String> _instanceStateMap;
// Instances that are common in both current instance state and next instance state of the segment
final Set<String> _availableInstances;

SingleSegmentAssignment(Map<String, String> instanceStateMap, Set<String> availableInstances) {
_instanceStateMap = instanceStateMap;
_availableInstances = availableInstances;
}
}
}
Loading

0 comments on commit e0f15aa

Please sign in to comment.