From 6061b8926a605012c65803b91c1039858a56b018 Mon Sep 17 00:00:00 2001 From: Xiang Fu Date: Wed, 16 Oct 2024 20:24:54 +0800 Subject: [PATCH] Fix race condition in IdealStateGroupCommit (#14237) --- .../utils/helix/IdealStateGroupCommit.java | 75 +++++++------ .../helix/IdealStateGroupCommitTest.java | 103 +++++++++++++----- 2 files changed, 115 insertions(+), 63 deletions(-) diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/helix/IdealStateGroupCommit.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/helix/IdealStateGroupCommit.java index ea74fb18e27..f7e7981a1ad 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/utils/helix/IdealStateGroupCommit.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/helix/IdealStateGroupCommit.java @@ -72,6 +72,7 @@ private static class Entry { final Function _updater; IdealState _updatedIdealState = null; AtomicBoolean _sent = new AtomicBoolean(false); + Throwable _exception; Entry(String resourceName, Function updater) { _resourceName = resourceName; @@ -106,8 +107,8 @@ public static synchronized void setMinNumCharsInISToTurnOnCompression(int minNum * @param updater the idealState updater to be applied * @return IdealState if the update is successful, null if not */ - public IdealState commit(HelixManager helixManager, String resourceName, - Function updater, RetryPolicy retryPolicy, boolean noChangeOk) { + public IdealState commit(HelixManager helixManager, String resourceName, Function updater, + RetryPolicy retryPolicy, boolean noChangeOk) { Queue queue = getQueue(resourceName); Entry entry = new Entry(resourceName, updater); @@ -120,39 +121,41 @@ public IdealState commit(HelixManager helixManager, String resourceName, // All pending entries have been processed, the updatedIdealState should be set. return entry._updatedIdealState; } - // remove from queue - Entry first = queue._pending.poll(); - processed.add(first); - String mergedResourceName = first._resourceName; - HelixDataAccessor dataAccessor = helixManager.getHelixDataAccessor(); - PropertyKey idealStateKey = dataAccessor.keyBuilder().idealStates(resourceName); - IdealState idealState = dataAccessor.getProperty(idealStateKey); - - // Make a copy of the idealState above to pass it to the updater - // NOTE: new IdealState(idealState.getRecord()) does not work because it's shallow copy for map fields and - // list fields - IdealState idealStateCopy = HelixHelper.cloneIdealState(idealState); - - /** - * If the local cache does not contain a value, need to check if there is a - * value in ZK; use it as initial value if exists - */ - IdealState updatedIdealState = first._updater.apply(idealStateCopy); - first._updatedIdealState = updatedIdealState; - Iterator it = queue._pending.iterator(); - while (it.hasNext()) { - Entry ent = it.next(); - if (!ent._resourceName.equals(mergedResourceName)) { - continue; + IdealState response = updateIdealState(helixManager, resourceName, idealState -> { + IdealState updatedIdealState = idealState; + if (!processed.isEmpty()) { + queue._pending.addAll(processed); + processed.clear(); + } + Iterator it = queue._pending.iterator(); + while (it.hasNext()) { + Entry ent = it.next(); + if (!ent._resourceName.equals(resourceName)) { + continue; + } + processed.add(ent); + it.remove(); + updatedIdealState = ent._updater.apply(updatedIdealState); + ent._updatedIdealState = updatedIdealState; + ent._exception = null; } - processed.add(ent); - updatedIdealState = ent._updater.apply(idealStateCopy); - ent._updatedIdealState = updatedIdealState; - it.remove(); + return updatedIdealState; + }, retryPolicy, noChangeOk); + if (response == null) { + RuntimeException ex = new RuntimeException("Failed to update IdealState"); + for (Entry ent : processed) { + ent._exception = ex; + ent._updatedIdealState = null; + } + throw ex; + } + } catch (Throwable e) { + // If there is an exception, set the exception for all processed entries + for (Entry ent : processed) { + ent._exception = e; + ent._updatedIdealState = null; } - IdealState finalUpdatedIdealState = updatedIdealState; - updateIdealState(helixManager, resourceName, anyIdealState -> finalUpdatedIdealState, - retryPolicy, noChangeOk); + throw e; } finally { queue._running.set(null); for (Entry e : processed) { @@ -176,6 +179,10 @@ public IdealState commit(HelixManager helixManager, String resourceName, } } } + if (entry._exception != null) { + throw new RuntimeException("Caught exception while updating ideal state for resource: " + resourceName, + entry._exception); + } return entry._updatedIdealState; } @@ -298,7 +305,7 @@ private boolean shouldCompress(IdealState is) { controllerMetrics.addMeteredValue(resourceName, ControllerMeter.IDEAL_STATE_UPDATE_SUCCESS, 1L); } return idealStateWrapper._idealState; - } catch (Exception e) { + } catch (Throwable e) { if (controllerMetrics != null) { controllerMetrics.addMeteredValue(resourceName, ControllerMeter.IDEAL_STATE_UPDATE_FAILURE, 1L); } diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/IdealStateGroupCommitTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/IdealStateGroupCommitTest.java index ffe39764a41..40304d14c6a 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/IdealStateGroupCommitTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/IdealStateGroupCommitTest.java @@ -18,6 +18,9 @@ */ package org.apache.pinot.controller.helix; +import java.util.ArrayList; +import java.util.List; +import java.util.Random; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.function.Function; @@ -32,60 +35,93 @@ import org.slf4j.LoggerFactory; import org.testng.Assert; import org.testng.annotations.AfterClass; +import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeClass; +import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; public class IdealStateGroupCommitTest { - private static final Logger LOGGER = LoggerFactory.getLogger(IdealStateGroupCommit.class); + private static final Logger LOGGER = LoggerFactory.getLogger(IdealStateGroupCommitTest.class); private static final ControllerTest TEST_INSTANCE = ControllerTest.getInstance(); - private static final String TABLE_NAME = "potato_OFFLINE"; - private static final int NUM_UPDATES = 2400; + private static final String TABLE_NAME_PREFIX = "potato_"; + private static final int NUM_PROCESSORS = 100; + private static final int NUM_UPDATES = 2000; + private static final int NUM_TABLES = 20; + + private ExecutorService _executorService; @BeforeClass public void setUp() throws Exception { TEST_INSTANCE.setupSharedStateAndValidate(); + _executorService = Executors.newFixedThreadPool(100); + } + + @BeforeMethod + public void beforeTest() { + for (int i = 0; i < NUM_UPDATES; i++) { + String tableName = TABLE_NAME_PREFIX + i + "_OFFLINE"; + IdealState idealState = new IdealState(tableName); + idealState.setStateModelDefRef("OnlineOffline"); + idealState.setRebalanceMode(IdealState.RebalanceMode.CUSTOMIZED); + idealState.setReplicas("1"); + idealState.setNumPartitions(0); + TEST_INSTANCE.getHelixAdmin().addResource(TEST_INSTANCE.getHelixClusterName(), tableName, idealState); + ControllerMetrics.get().removeTableMeter(tableName, ControllerMeter.IDEAL_STATE_UPDATE_SUCCESS); + } + } - IdealState idealState = new IdealState(TABLE_NAME); - idealState.setStateModelDefRef("OnlineOffline"); - idealState.setRebalanceMode(IdealState.RebalanceMode.CUSTOMIZED); - idealState.setReplicas("1"); - idealState.setNumPartitions(0); - TEST_INSTANCE.getHelixAdmin() - .addResource(TEST_INSTANCE.getHelixClusterName(), TABLE_NAME, idealState); + @AfterMethod + public void afterTest() { + for (int i = 0; i < NUM_UPDATES; i++) { + String tableName = TABLE_NAME_PREFIX + i + "_OFFLINE"; + TEST_INSTANCE.getHelixAdmin().dropResource(TEST_INSTANCE.getHelixClusterName(), tableName); + } } @AfterClass public void tearDown() { + _executorService.shutdown(); TEST_INSTANCE.cleanup(); } - @Test + @Test(invocationCount = 5) public void testGroupCommit() throws InterruptedException { - final IdealStateGroupCommit commit = new IdealStateGroupCommit(); - ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(400); + List groupCommitList = new ArrayList<>(); + for (int i = 0; i < NUM_PROCESSORS; i++) { + groupCommitList.add(new IdealStateGroupCommit()); + } for (int i = 0; i < NUM_UPDATES; i++) { - Runnable runnable = new IdealStateUpdater(TEST_INSTANCE.getHelixManager(), commit, TABLE_NAME, i); - newFixedThreadPool.submit(runnable); + for (int j = 0; j < NUM_TABLES; j++) { + String tableName = TABLE_NAME_PREFIX + j + "_OFFLINE"; + IdealStateGroupCommit commit = groupCommitList.get(new Random().nextInt(NUM_PROCESSORS)); + Runnable runnable = new IdealStateUpdater(TEST_INSTANCE.getHelixManager(), commit, tableName, i); + _executorService.submit(runnable); + } } - IdealState idealState = HelixHelper.getTableIdealState(TEST_INSTANCE.getHelixManager(), TABLE_NAME); - while (idealState.getNumPartitions() < NUM_UPDATES) { - Thread.sleep(500); - idealState = HelixHelper.getTableIdealState(TEST_INSTANCE.getHelixManager(), TABLE_NAME); + for (int i = 0; i < NUM_TABLES; i++) { + String tableName = TABLE_NAME_PREFIX + i + "_OFFLINE"; + IdealState idealState = HelixHelper.getTableIdealState(TEST_INSTANCE.getHelixManager(), tableName); + while (idealState.getNumPartitions() < NUM_UPDATES) { + Thread.sleep(500); + idealState = HelixHelper.getTableIdealState(TEST_INSTANCE.getHelixManager(), tableName); + } + Assert.assertEquals(idealState.getNumPartitions(), NUM_UPDATES); + ControllerMetrics controllerMetrics = ControllerMetrics.get(); + long idealStateUpdateSuccessCount = + controllerMetrics.getMeteredTableValue(tableName, ControllerMeter.IDEAL_STATE_UPDATE_SUCCESS).count(); + Assert.assertTrue(idealStateUpdateSuccessCount < NUM_UPDATES); + LOGGER.info("{} IdealState update are successfully commited with {} times zk updates.", NUM_UPDATES, + idealStateUpdateSuccessCount); } - Assert.assertEquals(idealState.getNumPartitions(), NUM_UPDATES); - ControllerMetrics controllerMetrics = ControllerMetrics.get(); - long idealStateUpdateSuccessCount = - controllerMetrics.getMeteredTableValue(TABLE_NAME, ControllerMeter.IDEAL_STATE_UPDATE_SUCCESS).count(); - Assert.assertTrue(idealStateUpdateSuccessCount < NUM_UPDATES); - LOGGER.info("{} IdealState update are successfully commited with {} times zk updates.", NUM_UPDATES, - idealStateUpdateSuccessCount); } } class IdealStateUpdater implements Runnable { + private static final Logger LOGGER = LoggerFactory.getLogger(IdealStateGroupCommitTest.class); + private final HelixManager _helixManager; private final IdealStateGroupCommit _commit; private final String _tableName; @@ -100,13 +136,22 @@ public IdealStateUpdater(HelixManager helixManager, IdealStateGroupCommit commit @Override public void run() { - _commit.commit(_helixManager, _tableName, new Function() { + Function updater = new Function() { @Override public IdealState apply(IdealState idealState) { idealState.setPartitionState("test_id" + _i, "test_id" + _i, "ONLINE"); return idealState; } - }, RetryPolicies.exponentialBackoffRetryPolicy(5, 1000L, 2.0f), false); - HelixHelper.getTableIdealState(_helixManager, _tableName); + }; + + while (true) { + try { + if (_commit.commit(_helixManager, _tableName, updater, RetryPolicies.noDelayRetryPolicy(1), false) != null) { + break; + } + } catch (Throwable e) { + LOGGER.warn("IdealState updater {} failed to commit.", _i, e); + } + } } }