Skip to content

Commit

Permalink
ZkBucketDataAccessor always complete scheduled GC (#2873)
Browse files Browse the repository at this point in the history
ZkBucketDataAccessor always complete scheduled GC
  • Loading branch information
GrantPSpencer authored Aug 16, 2024
1 parent e1799b4 commit 321f4eb
Show file tree
Hide file tree
Showing 2 changed files with 66 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
Expand Down Expand Up @@ -79,7 +79,7 @@ public class ZkBucketDataAccessor implements BucketDataAccessor, AutoCloseable {
private final ZkSerializer _zkSerializer;
private final RealmAwareZkClient _zkClient;
private final ZkBaseDataAccessor<byte[]> _zkBaseDataAccessor;
private final Map<String, ScheduledFuture> _gcTaskFutureMap = new HashMap<>();
private final Map<String, ScheduledFuture> _gcTaskFutureMap = new ConcurrentHashMap<>();
private boolean _usesExternalZkClient = false;

/**
Expand Down Expand Up @@ -239,7 +239,7 @@ public <T extends HelixProperty> boolean compressedBucketWrite(String rootPath,
}

// 5. Update the timer for GC
updateGCTimer(rootPath, version);
scheduleStaleVersionGC(rootPath);
return true;
}

Expand Down Expand Up @@ -268,13 +268,7 @@ public void disconnect() {

private HelixProperty compressedBucketRead(String path) {
// 1. Get the version to read
byte[] binaryVersionToRead = _zkBaseDataAccessor.get(path + "/" + LAST_SUCCESSFUL_WRITE_KEY,
null, AccessOption.PERSISTENT);
if (binaryVersionToRead == null) {
throw new ZkNoNodeException(
String.format("Last successful write ZNode does not exist for path: %s", path));
}
String versionToRead = new String(binaryVersionToRead);
String versionToRead = getLastSuccessfulWriteVersion(path);

// 2. Get the metadata map
byte[] binaryMetadata = _zkBaseDataAccessor.get(path + "/" + versionToRead + "/" + METADATA_KEY,
Expand Down Expand Up @@ -354,34 +348,38 @@ public void finalize() {
close();
}

private synchronized void updateGCTimer(String rootPath, long currentVersion) {
private synchronized void scheduleStaleVersionGC(String rootPath) {
// If GC already scheduled, return early
if (_gcTaskFutureMap.containsKey(rootPath)) {
_gcTaskFutureMap.remove(rootPath).cancel(false);
return;
}
// Schedule the gc task with TTL
// Schedule GC task
_gcTaskFutureMap.put(rootPath, GC_THREAD.schedule(() -> {
try {
deleteStaleVersions(rootPath, currentVersion);
} catch (Exception ex) {
LOG.error("Failed to delete the stale versions.", ex);
}
}, _versionTTLms, TimeUnit.MILLISECONDS));
}
try {
_gcTaskFutureMap.remove(rootPath);
deleteStaleVersions(rootPath);
} catch (Exception ex) {
LOG.error("Failed to delete the stale versions.", ex);
}
}, _versionTTLms, TimeUnit.MILLISECONDS));
}

/**
* Deletes all stale versions.
* @param rootPath
* @param currentVersion
*/
private void deleteStaleVersions(String rootPath, long currentVersion) {
private void deleteStaleVersions(String rootPath) {
// Get most recent write version
String currentVersionStr = getLastSuccessfulWriteVersion(rootPath);

// Get all children names under path
List<String> children = _zkBaseDataAccessor.getChildNames(rootPath, AccessOption.PERSISTENT);
if (children == null || children.isEmpty()) {
// The whole path has been deleted so return immediately
return;
}
List<String> pathsToDelete =
getPathsToDelete(rootPath, filterChildrenNames(children, currentVersion));
getPathsToDelete(rootPath, filterChildrenNames(children, Long.parseLong(currentVersionStr)));
for (String pathToDelete : pathsToDelete) {
// TODO: Should be batch delete but it doesn't work. It's okay since this runs async
_zkBaseDataAccessor.remove(pathToDelete, AccessOption.PERSISTENT);
Expand Down Expand Up @@ -429,4 +427,14 @@ private List<String> getPathsToDelete(String path, List<String> staleVersions) {
staleVersions.forEach(ver -> pathsToDelete.add(path + "/" + ver));
return pathsToDelete;
}

private String getLastSuccessfulWriteVersion(String path) {
byte[] binaryVersionToRead = _zkBaseDataAccessor.get(path + "/" + LAST_SUCCESSFUL_WRITE_KEY,
null, AccessOption.PERSISTENT);
if (binaryVersionToRead == null) {
throw new ZkNoNodeException(
String.format("Last successful write ZNode does not exist for path: %s", path));
}
return new String(binaryVersionToRead);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,15 +58,17 @@ public class TestZkBucketDataAccessor extends ZkTestBase {

private final ZNRecord record = new ZNRecord(NAME_KEY);

private HelixZkClient _zkClient;
private BucketDataAccessor _bucketDataAccessor;
private BaseDataAccessor<byte[]> _zkBaseDataAccessor;
private BucketDataAccessor _fastGCBucketDataAccessor;

@BeforeClass
public void beforeClass() {
// Initialize ZK accessors for testing
HelixZkClient zkClient = DedicatedZkClientFactory.getInstance()
_zkClient = DedicatedZkClientFactory.getInstance()
.buildZkClient(new HelixZkClient.ZkConnectionConfig(ZK_ADDR));
zkClient.setZkSerializer(new ZkSerializer() {
_zkClient.setZkSerializer(new ZkSerializer() {
@Override
public byte[] serialize(Object data) throws ZkMarshallingError {
if (data instanceof byte[]) {
Expand All @@ -80,8 +82,8 @@ public Object deserialize(byte[] data) throws ZkMarshallingError {
return data;
}
});
_zkBaseDataAccessor = new ZkBaseDataAccessor<>(zkClient);
_bucketDataAccessor = new ZkBucketDataAccessor(zkClient, 50 * 1024, VERSION_TTL_MS);
_zkBaseDataAccessor = new ZkBaseDataAccessor<>(_zkClient);
_bucketDataAccessor = new ZkBucketDataAccessor(_zkClient, 50 * 1024, VERSION_TTL_MS);

// Fill in some data for the record
record.setSimpleField(NAME_KEY, NAME_KEY);
Expand Down Expand Up @@ -193,6 +195,35 @@ public void testLargeWriteAndRead() throws IOException {
Assert.assertEquals(readRecord, property);
}

/**
* Test to ensure bucket GC still occurs in high frequency write scenarios.
*/
@Test(dependsOnMethods = "testLargeWriteAndRead")
public void testGCScheduler() throws IOException, InterruptedException {
long gcTTL = 1000; // GC schedule for 1 second after write
ZkBucketDataAccessor fastGCBucketDataAccessor = new ZkBucketDataAccessor(_zkClient, 50 * 1024, gcTTL);

int writeCount = 10;
for (int i = 0; i < writeCount; i++) {
Thread.sleep(gcTTL / 2);
Assert.assertTrue(fastGCBucketDataAccessor.compressedBucketWrite(PATH, new HelixProperty(record)));
}
List<String> children = _zkBaseDataAccessor.getChildNames(PATH, AccessOption.PERSISTENT);
// remove from list if name cant be parsed into long (aka not a version count node)
children.removeIf(name -> {
try {
Long.parseLong(name);
return false;
} catch (NumberFormatException e) {
return true;
}
});

Assert.assertTrue(children.size() < writeCount,
"Expecting stale versions to cleaned up. Children were: " + children);
System.out.print("Children after GC: " + children);
}

private HelixProperty createLargeHelixProperty(String name, int numEntries) {
HelixProperty property = new HelixProperty(name);
for (int i = 0; i < numEntries; i++) {
Expand Down

0 comments on commit 321f4eb

Please sign in to comment.