Skip to content

Commit

Permalink
Fix race condition in IdealStateGroupCommit (#14237)
Browse files Browse the repository at this point in the history
  • Loading branch information
xiangfu0 authored Oct 16, 2024
1 parent bb51514 commit 6061b89
Show file tree
Hide file tree
Showing 2 changed files with 115 additions and 63 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ private static class Entry {
final Function<IdealState, IdealState> _updater;
IdealState _updatedIdealState = null;
AtomicBoolean _sent = new AtomicBoolean(false);
Throwable _exception;

Entry(String resourceName, Function<IdealState, IdealState> updater) {
_resourceName = resourceName;
Expand Down Expand Up @@ -106,8 +107,8 @@ public static synchronized void setMinNumCharsInISToTurnOnCompression(int minNum
* @param updater the idealState updater to be applied
* @return IdealState if the update is successful, null if not
*/
public IdealState commit(HelixManager helixManager, String resourceName,
Function<IdealState, IdealState> updater, RetryPolicy retryPolicy, boolean noChangeOk) {
public IdealState commit(HelixManager helixManager, String resourceName, Function<IdealState, IdealState> updater,
RetryPolicy retryPolicy, boolean noChangeOk) {
Queue queue = getQueue(resourceName);
Entry entry = new Entry(resourceName, updater);

Expand All @@ -120,39 +121,41 @@ public IdealState commit(HelixManager helixManager, String resourceName,
// All pending entries have been processed, the updatedIdealState should be set.
return entry._updatedIdealState;
}
// remove from queue
Entry first = queue._pending.poll();
processed.add(first);
String mergedResourceName = first._resourceName;
HelixDataAccessor dataAccessor = helixManager.getHelixDataAccessor();
PropertyKey idealStateKey = dataAccessor.keyBuilder().idealStates(resourceName);
IdealState idealState = dataAccessor.getProperty(idealStateKey);

// Make a copy of the idealState above to pass it to the updater
// NOTE: new IdealState(idealState.getRecord()) does not work because it's shallow copy for map fields and
// list fields
IdealState idealStateCopy = HelixHelper.cloneIdealState(idealState);

/**
* If the local cache does not contain a value, need to check if there is a
* value in ZK; use it as initial value if exists
*/
IdealState updatedIdealState = first._updater.apply(idealStateCopy);
first._updatedIdealState = updatedIdealState;
Iterator<Entry> it = queue._pending.iterator();
while (it.hasNext()) {
Entry ent = it.next();
if (!ent._resourceName.equals(mergedResourceName)) {
continue;
IdealState response = updateIdealState(helixManager, resourceName, idealState -> {
IdealState updatedIdealState = idealState;
if (!processed.isEmpty()) {
queue._pending.addAll(processed);
processed.clear();
}
Iterator<Entry> it = queue._pending.iterator();
while (it.hasNext()) {
Entry ent = it.next();
if (!ent._resourceName.equals(resourceName)) {
continue;
}
processed.add(ent);
it.remove();
updatedIdealState = ent._updater.apply(updatedIdealState);
ent._updatedIdealState = updatedIdealState;
ent._exception = null;
}
processed.add(ent);
updatedIdealState = ent._updater.apply(idealStateCopy);
ent._updatedIdealState = updatedIdealState;
it.remove();
return updatedIdealState;
}, retryPolicy, noChangeOk);
if (response == null) {
RuntimeException ex = new RuntimeException("Failed to update IdealState");
for (Entry ent : processed) {
ent._exception = ex;
ent._updatedIdealState = null;
}
throw ex;
}
} catch (Throwable e) {
// If there is an exception, set the exception for all processed entries
for (Entry ent : processed) {
ent._exception = e;
ent._updatedIdealState = null;
}
IdealState finalUpdatedIdealState = updatedIdealState;
updateIdealState(helixManager, resourceName, anyIdealState -> finalUpdatedIdealState,
retryPolicy, noChangeOk);
throw e;
} finally {
queue._running.set(null);
for (Entry e : processed) {
Expand All @@ -176,6 +179,10 @@ public IdealState commit(HelixManager helixManager, String resourceName,
}
}
}
if (entry._exception != null) {
throw new RuntimeException("Caught exception while updating ideal state for resource: " + resourceName,
entry._exception);
}
return entry._updatedIdealState;
}

Expand Down Expand Up @@ -298,7 +305,7 @@ private boolean shouldCompress(IdealState is) {
controllerMetrics.addMeteredValue(resourceName, ControllerMeter.IDEAL_STATE_UPDATE_SUCCESS, 1L);
}
return idealStateWrapper._idealState;
} catch (Exception e) {
} catch (Throwable e) {
if (controllerMetrics != null) {
controllerMetrics.addMeteredValue(resourceName, ControllerMeter.IDEAL_STATE_UPDATE_FAILURE, 1L);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@
*/
package org.apache.pinot.controller.helix;

import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.function.Function;
Expand All @@ -32,60 +35,93 @@
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;


public class IdealStateGroupCommitTest {
private static final Logger LOGGER = LoggerFactory.getLogger(IdealStateGroupCommit.class);
private static final Logger LOGGER = LoggerFactory.getLogger(IdealStateGroupCommitTest.class);
private static final ControllerTest TEST_INSTANCE = ControllerTest.getInstance();
private static final String TABLE_NAME = "potato_OFFLINE";
private static final int NUM_UPDATES = 2400;
private static final String TABLE_NAME_PREFIX = "potato_";
private static final int NUM_PROCESSORS = 100;
private static final int NUM_UPDATES = 2000;
private static final int NUM_TABLES = 20;

private ExecutorService _executorService;

@BeforeClass
public void setUp()
throws Exception {
TEST_INSTANCE.setupSharedStateAndValidate();
_executorService = Executors.newFixedThreadPool(100);
}

@BeforeMethod
public void beforeTest() {
for (int i = 0; i < NUM_UPDATES; i++) {
String tableName = TABLE_NAME_PREFIX + i + "_OFFLINE";
IdealState idealState = new IdealState(tableName);
idealState.setStateModelDefRef("OnlineOffline");
idealState.setRebalanceMode(IdealState.RebalanceMode.CUSTOMIZED);
idealState.setReplicas("1");
idealState.setNumPartitions(0);
TEST_INSTANCE.getHelixAdmin().addResource(TEST_INSTANCE.getHelixClusterName(), tableName, idealState);
ControllerMetrics.get().removeTableMeter(tableName, ControllerMeter.IDEAL_STATE_UPDATE_SUCCESS);
}
}

IdealState idealState = new IdealState(TABLE_NAME);
idealState.setStateModelDefRef("OnlineOffline");
idealState.setRebalanceMode(IdealState.RebalanceMode.CUSTOMIZED);
idealState.setReplicas("1");
idealState.setNumPartitions(0);
TEST_INSTANCE.getHelixAdmin()
.addResource(TEST_INSTANCE.getHelixClusterName(), TABLE_NAME, idealState);
@AfterMethod
public void afterTest() {
for (int i = 0; i < NUM_UPDATES; i++) {
String tableName = TABLE_NAME_PREFIX + i + "_OFFLINE";
TEST_INSTANCE.getHelixAdmin().dropResource(TEST_INSTANCE.getHelixClusterName(), tableName);
}
}

@AfterClass
public void tearDown() {
_executorService.shutdown();
TEST_INSTANCE.cleanup();
}

@Test
@Test(invocationCount = 5)
public void testGroupCommit()
throws InterruptedException {
final IdealStateGroupCommit commit = new IdealStateGroupCommit();
ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(400);
List<IdealStateGroupCommit> groupCommitList = new ArrayList<>();
for (int i = 0; i < NUM_PROCESSORS; i++) {
groupCommitList.add(new IdealStateGroupCommit());
}
for (int i = 0; i < NUM_UPDATES; i++) {
Runnable runnable = new IdealStateUpdater(TEST_INSTANCE.getHelixManager(), commit, TABLE_NAME, i);
newFixedThreadPool.submit(runnable);
for (int j = 0; j < NUM_TABLES; j++) {
String tableName = TABLE_NAME_PREFIX + j + "_OFFLINE";
IdealStateGroupCommit commit = groupCommitList.get(new Random().nextInt(NUM_PROCESSORS));
Runnable runnable = new IdealStateUpdater(TEST_INSTANCE.getHelixManager(), commit, tableName, i);
_executorService.submit(runnable);
}
}
IdealState idealState = HelixHelper.getTableIdealState(TEST_INSTANCE.getHelixManager(), TABLE_NAME);
while (idealState.getNumPartitions() < NUM_UPDATES) {
Thread.sleep(500);
idealState = HelixHelper.getTableIdealState(TEST_INSTANCE.getHelixManager(), TABLE_NAME);
for (int i = 0; i < NUM_TABLES; i++) {
String tableName = TABLE_NAME_PREFIX + i + "_OFFLINE";
IdealState idealState = HelixHelper.getTableIdealState(TEST_INSTANCE.getHelixManager(), tableName);
while (idealState.getNumPartitions() < NUM_UPDATES) {
Thread.sleep(500);
idealState = HelixHelper.getTableIdealState(TEST_INSTANCE.getHelixManager(), tableName);
}
Assert.assertEquals(idealState.getNumPartitions(), NUM_UPDATES);
ControllerMetrics controllerMetrics = ControllerMetrics.get();
long idealStateUpdateSuccessCount =
controllerMetrics.getMeteredTableValue(tableName, ControllerMeter.IDEAL_STATE_UPDATE_SUCCESS).count();
Assert.assertTrue(idealStateUpdateSuccessCount < NUM_UPDATES);
LOGGER.info("{} IdealState update are successfully commited with {} times zk updates.", NUM_UPDATES,
idealStateUpdateSuccessCount);
}
Assert.assertEquals(idealState.getNumPartitions(), NUM_UPDATES);
ControllerMetrics controllerMetrics = ControllerMetrics.get();
long idealStateUpdateSuccessCount =
controllerMetrics.getMeteredTableValue(TABLE_NAME, ControllerMeter.IDEAL_STATE_UPDATE_SUCCESS).count();
Assert.assertTrue(idealStateUpdateSuccessCount < NUM_UPDATES);
LOGGER.info("{} IdealState update are successfully commited with {} times zk updates.", NUM_UPDATES,
idealStateUpdateSuccessCount);
}
}

class IdealStateUpdater implements Runnable {
private static final Logger LOGGER = LoggerFactory.getLogger(IdealStateGroupCommitTest.class);

private final HelixManager _helixManager;
private final IdealStateGroupCommit _commit;
private final String _tableName;
Expand All @@ -100,13 +136,22 @@ public IdealStateUpdater(HelixManager helixManager, IdealStateGroupCommit commit

@Override
public void run() {
_commit.commit(_helixManager, _tableName, new Function<IdealState, IdealState>() {
Function<IdealState, IdealState> updater = new Function<IdealState, IdealState>() {
@Override
public IdealState apply(IdealState idealState) {
idealState.setPartitionState("test_id" + _i, "test_id" + _i, "ONLINE");
return idealState;
}
}, RetryPolicies.exponentialBackoffRetryPolicy(5, 1000L, 2.0f), false);
HelixHelper.getTableIdealState(_helixManager, _tableName);
};

while (true) {
try {
if (_commit.commit(_helixManager, _tableName, updater, RetryPolicies.noDelayRetryPolicy(1), false) != null) {
break;
}
} catch (Throwable e) {
LOGGER.warn("IdealState updater {} failed to commit.", _i, e);
}
}
}
}

0 comments on commit 6061b89

Please sign in to comment.