Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move existing assignments usage calculation to pre-process stage #2888

Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -44,13 +44,16 @@
import org.apache.helix.controller.rebalancer.strategy.StickyRebalanceStrategy;
import org.apache.helix.controller.rebalancer.waged.WagedInstanceCapacity;
import org.apache.helix.controller.rebalancer.waged.WagedResourceWeightsProvider;
import org.apache.helix.controller.stages.CurrentStateOutput;
import org.apache.helix.controller.stages.MissingTopStateRecord;
import org.apache.helix.model.CustomizedState;
import org.apache.helix.model.CustomizedStateConfig;
import org.apache.helix.model.CustomizedView;
import org.apache.helix.model.ExternalView;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.InstanceConfig;
import org.apache.helix.model.Message;
import org.apache.helix.model.Partition;
import org.apache.helix.model.ResourceAssignment;
import org.apache.helix.zookeeper.datamodel.ZNRecord;
import org.slf4j.Logger;
Expand Down Expand Up @@ -583,6 +586,37 @@ public Set<CapacityNode> getSimpleCapacitySet() {
return _simpleCapacitySet;
}

public void populateSimpleCapacitySetUsage(final Set<String> resourceNameSet,
final CurrentStateOutput currentStateOutput) {
// Convert the assignableNodes to map for quick lookup
Map<String, CapacityNode> simpleCapacityMap = new HashMap<>();
for (CapacityNode node : _simpleCapacitySet) {
simpleCapacityMap.put(node.getId(), node);
}
junkaixue marked this conversation as resolved.
Show resolved Hide resolved
for (String resourceName : resourceNameSet) {
// Process current state mapping
for (Map.Entry<Partition, Map<String, String>> entry : currentStateOutput.getCurrentStateMap(
resourceName).entrySet()) {
for (String instanceName : entry.getValue().keySet()) {
CapacityNode node = simpleCapacityMap.get(instanceName);
if (node != null) {
node.canAdd(resourceName, entry.getKey().getPartitionName());
}
}
}
// Process pending state mapping
for (Map.Entry<Partition, Map<String, Message>> entry : currentStateOutput.getPendingMessageMap(
resourceName).entrySet()) {
for (String instanceName : entry.getValue().keySet()) {
CapacityNode node = simpleCapacityMap.get(instanceName);
if (node != null) {
node.canAdd(resourceName, entry.getKey().getPartitionName());
}
}
}
junkaixue marked this conversation as resolved.
Show resolved Hide resolved
}
}

private void refreshDisabledInstancesForAllPartitionsSet() {
_disabledInstancesForAllPartitionsSet.clear();
Collection<InstanceConfig> allConfigs = getInstanceConfigMap().values();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,16 +72,12 @@ public ConditionBasedRebalancer(List<RebalanceCondition> rebalanceConditions) {
@Override
public IdealState computeNewIdealState(String resourceName, IdealState currentIdealState,
CurrentStateOutput currentStateOutput, ResourceControllerDataProvider clusterData) {
if (!this._rebalanceConditions.stream()
ZNRecord cachedIdealState = clusterData.getCachedOndemandIdealState(resourceName);
// If previous placement list exists in cache && all condition met -> return cached value
if (cachedIdealState != null && cachedIdealState.getListFields() != null
&& !cachedIdealState.getListFields().isEmpty() && !this._rebalanceConditions.stream()
.allMatch(condition -> condition.shouldPerformRebalance(clusterData))) {
ZNRecord cachedIdealState = clusterData.getCachedOndemandIdealState(resourceName);
if (cachedIdealState != null) {
return new IdealState(cachedIdealState);
}
// In theory, the cache should be populated already if no rebalance is needed
LOG.warn(
"Cannot fetch the cached Ideal State for resource: {}, will recompute the Ideal State",
resourceName);
return new IdealState(cachedIdealState);
}

LOG.info("Computing IdealState for " + resourceName);
Expand Down Expand Up @@ -189,18 +185,16 @@ public IdealState computeNewIdealState(String resourceName, IdealState currentId
public ResourceAssignment computeBestPossiblePartitionState(ResourceControllerDataProvider cache,
IdealState idealState, Resource resource, CurrentStateOutput currentStateOutput) {
ZNRecord cachedIdealState = cache.getCachedOndemandIdealState(resource.getResourceName());
if (!this._rebalanceConditions.stream()
// If previous assignment map exists in cache && all condition met -> return cached value
if (cachedIdealState.getMapFields() != null && !cachedIdealState.getMapFields().isEmpty()
&& !this._rebalanceConditions.stream()
.allMatch(condition -> condition.shouldPerformRebalance(cache))) {
if (cachedIdealState != null && cachedIdealState.getMapFields() != null) {
ResourceAssignment partitionMapping = new ResourceAssignment(resource.getResourceName());
for (Partition partition : resource.getPartitions()) {
partitionMapping.addReplicaMap(partition, cachedIdealState.getMapFields().get(partition));
}
return new ResourceAssignment(cachedIdealState);
ResourceAssignment partitionMapping = new ResourceAssignment(resource.getResourceName());
for (Partition partition : resource.getPartitions()) {
partitionMapping.addReplicaMap(partition,
cachedIdealState.getMapFields().get(partition.getPartitionName()));
}
// In theory, the cache should be populated already if no rebalance is needed
LOG.warn("Cannot fetch the cached assignment for resource: {}, will recompute the assignment",
resource.getResourceName());
return partitionMapping;
}

LOG.info("Computing BestPossibleMapping for " + resource.getResourceName());
Expand All @@ -212,6 +206,10 @@ public ResourceAssignment computeBestPossiblePartitionState(ResourceControllerDa
cachedIdealState.setMapFields(assignment.getRecord().getMapFields());
cache.setCachedOndemandIdealState(resource.getResourceName(), cachedIdealState);

if (LOG.isDebugEnabled()) {
LOG.debug("Processed resource: {}", resource.getResourceName());
LOG.debug("Final Mapping of resource : {}", assignment);
}
return assignment;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
import org.slf4j.LoggerFactory;

public class StickyRebalanceStrategy implements RebalanceStrategy<ResourceControllerDataProvider> {
private static Logger logger = LoggerFactory.getLogger(StickyRebalanceStrategy.class);
private static final Logger logger = LoggerFactory.getLogger(StickyRebalanceStrategy.class);
private String _resourceName;
private List<String> _partitions;
private LinkedHashMap<String, Integer> _states;
Expand Down Expand Up @@ -70,52 +70,50 @@ public ZNRecord computePartitionAssignment(final List<String> allNodes,
return znRecord;
}

// Sort the assignable nodes by id
List<CapacityNode> assignableNodes = new ArrayList<>(clusterData.getSimpleCapacitySet());
assignableNodes.sort(Comparator.comparing(CapacityNode::getId));

// Filter out the nodes if not in the liveNodes parameter
// Note the liveNodes parameter here might be processed within the rebalancer, e.g. filter based on tags
Set<CapacityNode> assignableNodeSet = new HashSet<>(clusterData.getSimpleCapacitySet());
Set<String> liveNodesSet = new HashSet<>(liveNodes);
assignableNodes.removeIf(n -> !liveNodesSet.contains(n.getId()));
assignableNodeSet.removeIf(n -> !liveNodesSet.contains(n.getId()));

// Populate valid state map given current mapping
Map<String, Map<String, String>> stateMap =
populateValidStateMapFromCurrentMapping(currentMapping, assignableNodes);
Map<String, Set<String>> stateMap =
populateValidAssignmentMapFromCurrentMapping(currentMapping, assignableNodeSet);

if (logger.isDebugEnabled()) {
logger.debug("currentMapping: {}", currentMapping);
logger.debug("stateMap: {}", stateMap);
}

// Sort the assignable nodes by id
List<CapacityNode> assignableNodeList =
assignableNodeSet.stream().sorted(Comparator.comparing(CapacityNode::getId))
.collect(Collectors.toList());

// Assign partitions to node by order.
for (int i = 0, index = 0; i < _partitions.size(); i++) {
int startIndex = index;
for (Map.Entry<String, Integer> entry : _states.entrySet()) {
String state = entry.getKey();
int stateReplicaNumber = entry.getValue();
// For this partition, compute existing number replicas
long existsReplicas =
stateMap.computeIfAbsent(_partitions.get(i), m -> new HashMap<>()).values().stream()
.filter(s -> s.equals(state)).count();
for (int j = 0; j < stateReplicaNumber - existsReplicas; j++) {
while (index - startIndex < assignableNodes.size()) {
CapacityNode node = assignableNodes.get(index++ % assignableNodes.size());
if (node.canAdd(_resourceName, _partitions.get(i))) {
stateMap.get(_partitions.get(i)).put(node.getId(), state);
break;
}
int remainingReplica = _statesReplicaCount;
if (stateMap.containsKey(_partitions.get(i))) {
remainingReplica = remainingReplica - stateMap.get(_partitions.get(i)).size();
}
for (int j = 0; j < remainingReplica; j++) {
while (index - startIndex < assignableNodeList.size()) {
CapacityNode node = assignableNodeList.get(index++ % assignableNodeList.size());
if (node.canAdd(_resourceName, _partitions.get(i))) {
stateMap.computeIfAbsent(_partitions.get(i), m -> new HashSet<>()).add(node.getId());
break;
}
}

if (index - startIndex >= assignableNodes.size()) {
// If the all nodes have been tried out, then no node can be assigned.
logger.warn("No enough assignable nodes for resource: {}", _resourceName);
}
if (index - startIndex >= assignableNodeList.size()) {
// If the all nodes have been tried out, then no node can be assigned.
logger.warn("No enough assignable nodes for resource: {}", _resourceName);
}
}
}
for (Map.Entry<String, Map<String, String>> entry : stateMap.entrySet()) {
znRecord.setListField(entry.getKey(), new ArrayList<>(entry.getValue().keySet()));
for (Map.Entry<String, Set<String>> entry : stateMap.entrySet()) {
znRecord.setListField(entry.getKey(), new ArrayList<>(entry.getValue()));
}
if (logger.isDebugEnabled()) {
logger.debug("znRecord: {}", znRecord);
Expand All @@ -129,57 +127,27 @@ public ZNRecord computePartitionAssignment(final List<String> allNodes,
*
* @param currentMapping the current mapping of partitions to node states
* @param assignableNodes the list of nodes that can be assigned
* @return a map of partitions to valid node states
* @return a map of partitions to valid nodes
*/
private Map<String, Map<String, String>> populateValidStateMapFromCurrentMapping(
private Map<String, Set<String>> populateValidAssignmentMapFromCurrentMapping(
final Map<String, Map<String, String>> currentMapping,
final List<CapacityNode> assignableNodes) {
Map<String, Map<String, String>> validStateMap = new HashMap<>();
final Set<CapacityNode> assignableNodes) {
Map<String, Set<String>> validAssignmentMap = new HashMap<>();
// Convert the assignableNodes to map for quick lookup
Map<String, CapacityNode> assignableNodeMap =
assignableNodes.stream().collect(Collectors.toMap(CapacityNode::getId, node -> node));
if (currentMapping != null) {
for (Map.Entry<String, Map<String, String>> entry : currentMapping.entrySet()) {
String partition = entry.getKey();
Map<String, String> currentNodeStateMap = new HashMap<>(entry.getValue());
// Skip if current node state is invalid with state model
if (!isValidStateMap(currentNodeStateMap)) {
continue;
}
// Filter out invalid node assignment
currentNodeStateMap.entrySet()
.removeIf(e -> !isValidNodeAssignment(partition, e.getKey(), assignableNodeMap));

validStateMap.put(partition, currentNodeStateMap);
}
}
return validStateMap;
}

/**
* Validates whether the provided state mapping is valid according to the defined state model.
*
* @param currentNodeStateMap A map representing the actual state mapping where the key is the node ID and the value is the state.
* @return true if the state map is valid, false otherwise
*/
private boolean isValidStateMap(final Map<String, String> currentNodeStateMap) {
// Check if the size of the current state map exceeds the total state count in state model
if (currentNodeStateMap.size() > _statesReplicaCount) {
return false;
}

Map<String, Integer> tmpStates = new HashMap<>(_states);
for (String state : currentNodeStateMap.values()) {
// Return invalid if:
// The state is not defined in the state model OR
// The state count exceeds the defined count in state model
if (!tmpStates.containsKey(state) || tmpStates.get(state) <= 0) {
return false;
validAssignmentMap.put(partition, new HashSet<>(currentNodeStateMap.keySet()));
}
tmpStates.put(state, tmpStates.get(state) - 1);
}

return true;
return validAssignmentMap;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,14 @@ public void process(ClusterEvent event) throws Exception {

handleResourceCapacityCalculation(event, (ResourceControllerDataProvider) cache, currentStateOutput);
}

// Populate the capacity for simple CapacityNode
if (cache.getClusterConfig().getGlobalMaxPartitionAllowedPerInstance() != -1
&& cache instanceof ResourceControllerDataProvider) {
final ResourceControllerDataProvider dataProvider = (ResourceControllerDataProvider) cache;
dataProvider.populateSimpleCapacitySetUsage(resourceToRebalance.keySet(),
currentStateExcludingUnknown);
}
}

// update all pending messages to CurrentStateOutput.
Expand Down
18 changes: 17 additions & 1 deletion helix-core/src/test/java/org/apache/helix/common/ZkTestBase.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
* under the License.
*/

import com.google.common.base.Preconditions;
import java.io.IOException;
import java.lang.management.ManagementFactory;
import java.lang.reflect.Method;
Expand All @@ -33,6 +32,7 @@
import javax.management.MBeanServerConnection;
import javax.management.ObjectName;

import com.google.common.base.Preconditions;
import org.apache.helix.BaseDataAccessor;
import org.apache.helix.ConfigAccessor;
import org.apache.helix.HelixAdmin;
Expand All @@ -50,6 +50,7 @@
import org.apache.helix.controller.pipeline.Pipeline;
import org.apache.helix.controller.pipeline.Stage;
import org.apache.helix.controller.pipeline.StageContext;
import org.apache.helix.controller.rebalancer.ConditionBasedRebalancer;
import org.apache.helix.controller.rebalancer.DelayedAutoRebalancer;
import org.apache.helix.controller.rebalancer.strategy.AutoRebalanceStrategy;
import org.apache.helix.controller.rebalancer.waged.WagedRebalancer;
Expand Down Expand Up @@ -365,6 +366,14 @@ protected void setLastOnDemandRebalanceTimeInCluster(HelixZkClient zkClient,
configAccessor.setClusterConfig(clusterName, clusterConfig);
}

protected void setGlobalMaxPartitionAllowedPerInstanceInCluster(HelixZkClient zkClient,
String clusterName, int maxPartitionAllowed) {
ConfigAccessor configAccessor = new ConfigAccessor(zkClient);
ClusterConfig clusterConfig = configAccessor.getClusterConfig(clusterName);
clusterConfig.setGlobalMaxPartitionAllowedPerInstance(maxPartitionAllowed);
configAccessor.setClusterConfig(clusterName, clusterConfig);
}

protected IdealState createResourceWithDelayedRebalance(String clusterName, String db,
String stateModel, int numPartition, int replica, int minActiveReplica, long delay) {
return createResourceWithDelayedRebalance(clusterName, db, stateModel, numPartition, replica,
Expand All @@ -384,6 +393,13 @@ protected IdealState createResourceWithWagedRebalance(String clusterName, String
-1, WagedRebalancer.class.getName(), null);
}

protected IdealState createResourceWithConditionBasedRebalance(String clusterName, String db,
String stateModel, int numPartition, int replica, int minActiveReplica,
String rebalanceStrategy) {
return createResource(clusterName, db, stateModel, numPartition, replica, minActiveReplica, -1,
ConditionBasedRebalancer.class.getName(), rebalanceStrategy);
}

private IdealState createResource(String clusterName, String db, String stateModel,
int numPartition, int replica, int minActiveReplica, long delay, String rebalancerClassName,
String rebalanceStrategy) {
Expand Down
Loading
Loading