From ff83bdce84b3608399a78af2d242eeaae22d1e44 Mon Sep 17 00:00:00 2001 From: Zachary Pinto Date: Wed, 13 Mar 2024 20:00:32 -0700 Subject: [PATCH] Fix BestPossibleExternalViewVerifier to use a ZkClient that has the serializer set to ByteArraySerializer (#2776) * Fix BestPossibleExternalViewVerifier to use a ZkClient that has the serializer set to ByteArraySerializer so it can read the assignment meta store best possible state. Fix BestPossibleExternalViewVerifier to actually calculate BEST_POSSIBLE instead of returning last persisted to ZK because we now need to consider handleDelayedRebalanceMinActiveReplica not being persisted to ZK(#2447). Fix handleDelayedRebalanceMinActiveReplica modifying in-memory _bestPossibleState in the _assignmentMetadataStore which was causing best possible state to continuosly be persisted until handleDelayedRebalanceMinActiveReplica wasn't kicking in anymore. --- .../rebalancer/waged/AssignmentManager.java | 8 +++++- .../BestPossibleExternalViewVerifier.java | 27 ++++++++----------- 2 files changed, 18 insertions(+), 17 deletions(-) diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/AssignmentManager.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/AssignmentManager.java index 475e8aad14..8cb089cb9e 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/AssignmentManager.java +++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/AssignmentManager.java @@ -23,6 +23,8 @@ import java.util.HashSet; import java.util.Map; import java.util.Set; +import java.util.stream.Collectors; + import org.apache.helix.HelixRebalanceException; import org.apache.helix.controller.stages.CurrentStateOutput; import org.apache.helix.model.ResourceAssignment; @@ -86,7 +88,11 @@ public Map getBestPossibleAssignment( if (assignmentMetadataStore != null) { try { _stateReadLatency.startMeasuringLatency(); - currentBestAssignment = new HashMap<>(assignmentMetadataStore.getBestPossibleAssignment()); + currentBestAssignment = + assignmentMetadataStore.getBestPossibleAssignment().entrySet().stream().collect( + Collectors.toMap(Map.Entry::getKey, + entry -> new ResourceAssignment(entry.getValue().getRecord()))); + ; _stateReadLatency.endMeasuringLatency(); } catch (Exception ex) { throw new HelixRebalanceException( diff --git a/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/BestPossibleExternalViewVerifier.java b/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/BestPossibleExternalViewVerifier.java index 1997bea06d..0b0926dda3 100644 --- a/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/BestPossibleExternalViewVerifier.java +++ b/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/BestPossibleExternalViewVerifier.java @@ -30,19 +30,16 @@ import java.util.Set; import org.apache.helix.HelixDefinedState; -import org.apache.helix.HelixRebalanceException; import org.apache.helix.PropertyKey; import org.apache.helix.controller.common.PartitionStateMap; import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider; import org.apache.helix.controller.rebalancer.waged.ReadOnlyWagedRebalancer; -import org.apache.helix.controller.rebalancer.waged.RebalanceAlgorithm; import org.apache.helix.controller.stages.AttributeName; import org.apache.helix.controller.stages.BestPossibleStateCalcStage; import org.apache.helix.controller.stages.BestPossibleStateOutput; import org.apache.helix.controller.stages.ClusterEvent; import org.apache.helix.controller.stages.ClusterEventType; import org.apache.helix.controller.stages.CurrentStateComputationStage; -import org.apache.helix.controller.stages.CurrentStateOutput; import org.apache.helix.controller.stages.ResourceComputationStage; import org.apache.helix.manager.zk.ZkBucketDataAccessor; import org.apache.helix.model.ClusterConfig; @@ -50,7 +47,6 @@ import org.apache.helix.model.IdealState; import org.apache.helix.model.Partition; import org.apache.helix.model.Resource; -import org.apache.helix.model.ResourceAssignment; import org.apache.helix.model.StateModelDefinition; import org.apache.helix.task.TaskConstants; import org.apache.helix.util.RebalanceUtil; @@ -59,8 +55,13 @@ import org.slf4j.LoggerFactory; /** - * verifier that the ExternalViews of given resources (or all resources in the cluster) - * match its best possible mapping states. + * Verify that the ExternalViews of given resources (or all resources in the cluster) + * match its best possible mapping states. The best possible mapping states are computed + * by running the BestPossibleStateCalc stage with the same inputs that the controller would + * use to calculate the best possible state. The mappings produced by this stage are compared + * to the external view to ensure that they match. When they match, the cluster has converged. + * Note: The best possible state compared to the external view includes the non-persisted state + * mappings generated when handling MIN_ACTIVE replicas. */ public class BestPossibleExternalViewVerifier extends ZkHelixClusterVerifier { private static Logger LOG = LoggerFactory.getLogger(BestPossibleExternalViewVerifier.class); @@ -433,7 +434,10 @@ private BestPossibleStateOutput calcBestPossState(ResourceControllerDataProvider RebalanceUtil.runStage(event, new CurrentStateComputationStage()); // Note the readOnlyWagedRebalancer is just for one time usage - try (ZkBucketDataAccessor zkBucketDataAccessor = new ZkBucketDataAccessor(_zkClient); + try ( + // Pass the zkAddress to constructor to ensure the correct ZkClient is created with ByteArraySerializer + ZkBucketDataAccessor zkBucketDataAccessor = new ZkBucketDataAccessor( + _zkClient.getServers()); DryrunWagedRebalancer dryrunWagedRebalancer = new DryrunWagedRebalancer(zkBucketDataAccessor, cache.getClusterName(), cache.getClusterConfig().getGlobalRebalancePreference())) { event.addAttribute(AttributeName.STATEFUL_REBALANCER.name(), dryrunWagedRebalancer); @@ -462,14 +466,5 @@ public DryrunWagedRebalancer(ZkBucketDataAccessor zkBucketDataAccessor, String c Map preferences) { super(zkBucketDataAccessor, clusterName, preferences); } - - @Override - protected Map computeBestPossibleAssignment( - ResourceControllerDataProvider clusterData, Map resourceMap, - Set activeNodes, CurrentStateOutput currentStateOutput, - RebalanceAlgorithm algorithm) throws HelixRebalanceException { - return getBestPossibleAssignment(getAssignmentMetadataStore(), currentStateOutput, - resourceMap.keySet()); - } } }