Skip to content

Commit

Permalink
Fix BestPossibleExternalViewVerifier to use a ZkClient that has the s…
Browse files Browse the repository at this point in the history
…erializer set to ByteArraySerializer (apache#2776)

* Fix BestPossibleExternalViewVerifier to use a ZkClient that has the serializer set to ByteArraySerializer so it can read the assignment meta store best possible state. Fix BestPossibleExternalViewVerifier to actually calculate BEST_POSSIBLE instead of returning last persisted to ZK because we now need to consider handleDelayedRebalanceMinActiveReplica not being persisted to ZK(apache#2447). Fix handleDelayedRebalanceMinActiveReplica modifying in-memory _bestPossibleState in the _assignmentMetadataStore which was causing best possible state to continuosly be persisted until handleDelayedRebalanceMinActiveReplica wasn't kicking in anymore.
  • Loading branch information
zpinto authored and Charanya Sudharsanan committed Aug 5, 2024
1 parent 43e8db2 commit ff83bdc
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;

import org.apache.helix.HelixRebalanceException;
import org.apache.helix.controller.stages.CurrentStateOutput;
import org.apache.helix.model.ResourceAssignment;
Expand Down Expand Up @@ -86,7 +88,11 @@ public Map<String, ResourceAssignment> getBestPossibleAssignment(
if (assignmentMetadataStore != null) {
try {
_stateReadLatency.startMeasuringLatency();
currentBestAssignment = new HashMap<>(assignmentMetadataStore.getBestPossibleAssignment());
currentBestAssignment =
assignmentMetadataStore.getBestPossibleAssignment().entrySet().stream().collect(
Collectors.toMap(Map.Entry::getKey,
entry -> new ResourceAssignment(entry.getValue().getRecord())));
;
_stateReadLatency.endMeasuringLatency();
} catch (Exception ex) {
throw new HelixRebalanceException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,27 +30,23 @@
import java.util.Set;

import org.apache.helix.HelixDefinedState;
import org.apache.helix.HelixRebalanceException;
import org.apache.helix.PropertyKey;
import org.apache.helix.controller.common.PartitionStateMap;
import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
import org.apache.helix.controller.rebalancer.waged.ReadOnlyWagedRebalancer;
import org.apache.helix.controller.rebalancer.waged.RebalanceAlgorithm;
import org.apache.helix.controller.stages.AttributeName;
import org.apache.helix.controller.stages.BestPossibleStateCalcStage;
import org.apache.helix.controller.stages.BestPossibleStateOutput;
import org.apache.helix.controller.stages.ClusterEvent;
import org.apache.helix.controller.stages.ClusterEventType;
import org.apache.helix.controller.stages.CurrentStateComputationStage;
import org.apache.helix.controller.stages.CurrentStateOutput;
import org.apache.helix.controller.stages.ResourceComputationStage;
import org.apache.helix.manager.zk.ZkBucketDataAccessor;
import org.apache.helix.model.ClusterConfig;
import org.apache.helix.model.ExternalView;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.Partition;
import org.apache.helix.model.Resource;
import org.apache.helix.model.ResourceAssignment;
import org.apache.helix.model.StateModelDefinition;
import org.apache.helix.task.TaskConstants;
import org.apache.helix.util.RebalanceUtil;
Expand All @@ -59,8 +55,13 @@
import org.slf4j.LoggerFactory;

/**
* verifier that the ExternalViews of given resources (or all resources in the cluster)
* match its best possible mapping states.
* Verify that the ExternalViews of given resources (or all resources in the cluster)
* match its best possible mapping states. The best possible mapping states are computed
* by running the BestPossibleStateCalc stage with the same inputs that the controller would
* use to calculate the best possible state. The mappings produced by this stage are compared
* to the external view to ensure that they match. When they match, the cluster has converged.
* Note: The best possible state compared to the external view includes the non-persisted state
* mappings generated when handling MIN_ACTIVE replicas.
*/
public class BestPossibleExternalViewVerifier extends ZkHelixClusterVerifier {
private static Logger LOG = LoggerFactory.getLogger(BestPossibleExternalViewVerifier.class);
Expand Down Expand Up @@ -433,7 +434,10 @@ private BestPossibleStateOutput calcBestPossState(ResourceControllerDataProvider
RebalanceUtil.runStage(event, new CurrentStateComputationStage());
// Note the readOnlyWagedRebalancer is just for one time usage

try (ZkBucketDataAccessor zkBucketDataAccessor = new ZkBucketDataAccessor(_zkClient);
try (
// Pass the zkAddress to constructor to ensure the correct ZkClient is created with ByteArraySerializer
ZkBucketDataAccessor zkBucketDataAccessor = new ZkBucketDataAccessor(
_zkClient.getServers());
DryrunWagedRebalancer dryrunWagedRebalancer = new DryrunWagedRebalancer(zkBucketDataAccessor,
cache.getClusterName(), cache.getClusterConfig().getGlobalRebalancePreference())) {
event.addAttribute(AttributeName.STATEFUL_REBALANCER.name(), dryrunWagedRebalancer);
Expand Down Expand Up @@ -462,14 +466,5 @@ public DryrunWagedRebalancer(ZkBucketDataAccessor zkBucketDataAccessor, String c
Map<ClusterConfig.GlobalRebalancePreferenceKey, Integer> preferences) {
super(zkBucketDataAccessor, clusterName, preferences);
}

@Override
protected Map<String, ResourceAssignment> computeBestPossibleAssignment(
ResourceControllerDataProvider clusterData, Map<String, Resource> resourceMap,
Set<String> activeNodes, CurrentStateOutput currentStateOutput,
RebalanceAlgorithm algorithm) throws HelixRebalanceException {
return getBestPossibleAssignment(getAssignmentMetadataStore(), currentStateOutput,
resourceMap.keySet());
}
}
}

0 comments on commit ff83bdc

Please sign in to comment.