Skip to content

Commit

Permalink
[server][dvc] generate snapshot after endBatchWrite complete (#1162)
Browse files Browse the repository at this point in the history
  • Loading branch information
jingy-li authored Sep 11, 2024
1 parent 233d3fd commit 7e66fd3
Show file tree
Hide file tree
Showing 16 changed files with 317 additions and 193 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,7 @@ public DaVinciBackend(
}

if (backendConfig.isBlobTransferManagerEnabled()) {
blobTransferManager = BlobTransferUtil.getP2PBlobTransferManagerAndStart(
blobTransferManager = BlobTransferUtil.getP2PBlobTransferManagerForDVCAndStart(
configLoader.getVeniceServerConfig().getDvcP2pBlobTransferServerPort(),
configLoader.getVeniceServerConfig().getDvcP2pBlobTransferClientPort(),
configLoader.getVeniceServerConfig().getRocksDBPath(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,12 @@
import com.linkedin.venice.utils.concurrent.VeniceConcurrentHashMap;
import com.linkedin.venice.utils.locks.AutoCloseableLock;
import java.io.File;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.commons.io.FileUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.rocksdb.Checkpoint;
Expand Down Expand Up @@ -161,4 +163,37 @@ protected Checkpoint createCheckpoint(RocksDB rocksDB) {
return Checkpoint.create(rocksDB);
}

/**
* util method to create a snapshot for batch only
* It will check the snapshot directory and delete it if it exists, then generate a new snapshot
*/
public static void createSnapshotForBatch(RocksDB rocksDB, String fullPathForPartitionDBSnapshot) {
LOGGER.info("Creating snapshot for batch in directory: {}", fullPathForPartitionDBSnapshot);

// clean up the snapshot directory if it exists
File partitionSnapshotDir = new File(fullPathForPartitionDBSnapshot);
if (partitionSnapshotDir.exists()) {
LOGGER.info("Snapshot directory already exists, deleting old snapshots at {}", fullPathForPartitionDBSnapshot);
try {
FileUtils.deleteDirectory(partitionSnapshotDir);
} catch (IOException e) {
throw new VeniceException(
"Failed to delete the existing snapshot directory: " + fullPathForPartitionDBSnapshot,
e);
}
}

try {
LOGGER.info("Start creating snapshots for batch in directory: {}", fullPathForPartitionDBSnapshot);

Checkpoint checkpoint = Checkpoint.create(rocksDB);
checkpoint.createCheckpoint(fullPathForPartitionDBSnapshot);

LOGGER.info("Finished creating snapshots for batch in directory: {}", fullPathForPartitionDBSnapshot);
} catch (RocksDBException e) {
throw new VeniceException(
"Received exception during RocksDB's snapshot creation in directory " + fullPathForPartitionDBSnapshot,
e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import com.linkedin.davinci.stats.ingestion.heartbeat.HeartbeatMonitoringService;
import com.linkedin.davinci.storage.StorageMetadataService;
import com.linkedin.davinci.storage.StorageService;
import com.linkedin.venice.blobtransfer.BlobTransferManager;
import com.linkedin.venice.common.VeniceSystemStoreType;
import com.linkedin.venice.exceptions.VeniceException;
import com.linkedin.venice.helix.HelixAdapterSerializer;
Expand Down Expand Up @@ -91,6 +92,7 @@ public class HelixParticipationService extends AbstractVeniceService
private HelixPartitionStatusAccessor partitionPushStatusAccessor;
private ThreadPoolExecutor leaderFollowerHelixStateTransitionThreadPool;
private VeniceOfflinePushMonitorAccessor veniceOfflinePushMonitorAccessor;
private BlobTransferManager<Void> blobTransferManager;
private final HeartbeatMonitoringService heartbeatMonitoringService;

// This is ONLY for testing purpose.
Expand All @@ -111,7 +113,8 @@ public HelixParticipationService(
int port,
String hostname,
CompletableFuture<SafeHelixManager> managerFuture,
HeartbeatMonitoringService heartbeatMonitoringService) {
HeartbeatMonitoringService heartbeatMonitoringService,
BlobTransferManager blobTransferManager) {
this.ingestionService = storeIngestionService;
this.storageService = storageService;
this.clusterName = clusterName;
Expand All @@ -125,6 +128,7 @@ public HelixParticipationService(
this.metricsRepository = metricsRepository;
this.instance = new Instance(participantName, hostname, port);
this.managerFuture = managerFuture;
this.blobTransferManager = blobTransferManager;
this.partitionPushStatusAccessorFuture = new CompletableFuture<>();
if (!(storeIngestionService instanceof KafkaStoreIngestionService)) {
throw new VeniceException("Expecting " + KafkaStoreIngestionService.class.getName() + " for ingestion backend!");
Expand All @@ -134,8 +138,8 @@ public HelixParticipationService(
storageMetadataService,
(KafkaStoreIngestionService) storeIngestionService,
storageService,
null,
null);
blobTransferManager,
veniceConfigLoader.getVeniceServerConfig());
}

// Set corePoolSize and maxPoolSize as the same value, but enable allowCoreThreadTimeOut. So the expected
Expand Down Expand Up @@ -252,6 +256,13 @@ public void stopInner() throws IOException {
LOGGER.info("Helix Manager is null.");
}
ingestionBackend.close();
if (blobTransferManager != null) {
try {
blobTransferManager.close();
} catch (Exception e) {
LOGGER.error("Swallowed an exception while trying to close the blobTransferManager.", e);
}
}
LOGGER.info("Closed VeniceIngestionBackend.");
leaderFollowerParticipantModelFactory.shutDownExecutor();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,10 +80,7 @@ public void startConsumption(VeniceStoreVersionConfig storeConfig, int partition
storeVersion,
partition);
};

// TODO: need to differentiate that's DVC or server. Right now, it doesn't tell so both components can create,
// though
// Only DVC would create blobTransferManager.
// TODO: remove hybrid check after blob transfer in hybrid mode is fully supported
if (!storeAndVersion.getFirst().isBlobTransferEnabled() || storeAndVersion.getFirst().isHybrid()
|| blobTransferManager == null) {
runnable.run();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@ public abstract class StoreIngestionTask implements Runnable, Closeable {
protected final Optional<HybridStoreConfig> hybridStoreConfig;
protected final Consumer<DataValidationException> divErrorMetricCallback;
private final ExecutorService missingSOPCheckExecutor = Executors.newSingleThreadExecutor();

private final VeniceStoreVersionConfig storeConfig;
protected final long readCycleDelayMs;
protected final long emptyPollSleepMs;

Expand Down Expand Up @@ -348,6 +348,7 @@ public StoreIngestionTask(
Optional<ObjectCacheBackend> cacheBackend,
Function<Integer, DaVinciRecordTransformer> getRecordTransformer,
Queue<VeniceNotifier> notifiers) {
this.storeConfig = storeConfig;
this.readCycleDelayMs = storeConfig.getKafkaReadCycleDelayMs();
this.emptyPollSleepMs = storeConfig.getKafkaEmptyPollSleepMs();
this.databaseSyncBytesIntervalForTransactionalMode = storeConfig.getDatabaseSyncBytesIntervalForTransactionalMode();
Expand Down Expand Up @@ -2679,6 +2680,13 @@ protected void processEndOfPush(
}
}

/**
* Generate snapshot after batch write is done.
*/
if (storeConfig.isBlobTransferEnabled()) {
storageEngine.createSnapshot(storagePartitionConfig);
}

/**
* The checksum verification is not used after EOP, so completely reset it.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -423,6 +423,15 @@ public synchronized void endBatchWrite(StoragePartitionConfig storagePartitionCo
}
}

/**
* Create snapshot for the given partition
* @param storagePartitionConfig
*/
public synchronized void createSnapshot(StoragePartitionConfig storagePartitionConfig) {
AbstractStoragePartition partition = getPartitionOrThrow(storagePartitionConfig.getPartitionId());
partition.createSnapshot();
}

private void executeWithSafeGuard(int partitionId, Runnable runnable) {
executeWithSafeGuard(partitionId, () -> {
runnable.run();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,7 @@ public ReplicationMetadataRocksDBStoragePartition(
super.getOptions(),
fullPathForTempSSTFileDir,
true,
rocksDBServerConfig,
super.getBlobTransferEnabled());
rocksDBServerConfig);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,6 @@ public ReusableObjects() {
private long recordNumInCurrentSSTFile = 0;
private long recordNumInAllSSTFiles = 0;
private String fullPathForTempSSTFileDir;
private final String fullPathForPartitionDBSnapshot;
private Optional<Supplier<byte[]>> expectedChecksumSupplier;
private final String storeName;
private final int partitionId;
Expand Down Expand Up @@ -108,15 +107,12 @@ public RocksDBSstFileWriter(
Options options,
String fullPathForTempSSTFileDir,
boolean isRMD,
RocksDBServerConfig rocksDBServerConfig,
boolean blobTransferEnabled) {
RocksDBServerConfig rocksDBServerConfig) {
this.storeName = storeName;
this.partitionId = partitionId;
this.envOptions = envOptions;
this.options = options;
this.fullPathForTempSSTFileDir = fullPathForTempSSTFileDir;
this.fullPathForPartitionDBSnapshot =
blobTransferEnabled ? RocksDBUtils.composeSnapshotDir(dbDir, storeName, partitionId) : null;
this.isRMD = isRMD;
this.lastCheckPointedSSTFileNum = isRMD ? ROCKSDB_LAST_FINISHED_RMD_SST_FILE_NO : ROCKSDB_LAST_FINISHED_SST_FILE_NO;
this.rocksDBServerConfig = rocksDBServerConfig;
Expand Down Expand Up @@ -501,25 +497,6 @@ public void ingestSSTFiles(RocksDB rocksDB, List<ColumnFamilyHandle> columnFamil
}
}

public void createSnapshot(RocksDB rocksDB) {
if (fullPathForPartitionDBSnapshot == null || fullPathForPartitionDBSnapshot.isEmpty()) {
return;
}

try {
Checkpoint checkpoint = createCheckpoint(rocksDB);

LOGGER.info("Start creating snapshots in directory: {}", this.fullPathForPartitionDBSnapshot);
checkpoint.createCheckpoint(this.fullPathForPartitionDBSnapshot);

LOGGER.info("Finished creating snapshots in directory: {}", this.fullPathForPartitionDBSnapshot);
} catch (RocksDBException e) {
throw new VeniceException(
"Received exception during RocksDB's snapshot creation in directory " + this.fullPathForPartitionDBSnapshot,
e);
}
}

private List<String> getTemporarySSTFilePaths() {
File tempSSTFileDir = new File(fullPathForTempSSTFileDir);
String[] sstFiles = tempSSTFileDir.list((dir, name) -> isTempSSTFile(name) && new File(dir, name).length() > 0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import static com.linkedin.davinci.store.AbstractStorageEngine.METADATA_PARTITION_ID;

import com.linkedin.davinci.blobtransfer.BlobSnapshotManager;
import com.linkedin.davinci.callback.BytesStreamingCallback;
import com.linkedin.davinci.config.VeniceStoreVersionConfig;
import com.linkedin.davinci.stats.RocksDBMemoryStats;
Expand Down Expand Up @@ -77,7 +78,7 @@ public class RocksDBStoragePartition extends AbstractStoragePartition {
*/
protected final WriteOptions writeOptions;
private final String fullPathForTempSSTFileDir;
private final String fullPathForTempSnapshotFileDir;
private final String fullPathForPartitionDBSnapshot;

private final EnvOptions envOptions;

Expand Down Expand Up @@ -205,7 +206,7 @@ protected RocksDBStoragePartition(
this.expectedChecksumSupplier = Optional.empty();
this.rocksDBThrottler = rocksDbThrottler;
this.fullPathForTempSSTFileDir = RocksDBUtils.composeTempSSTFileDir(dbDir, storeNameAndVersion, partitionId);
this.fullPathForTempSnapshotFileDir =
this.fullPathForPartitionDBSnapshot =
blobTransferEnabled ? RocksDBUtils.composeSnapshotDir(dbDir, storeNameAndVersion, partitionId) : null;

if (deferredWrite) {
Expand All @@ -217,8 +218,7 @@ protected RocksDBStoragePartition(
options,
fullPathForTempSSTFileDir,
false,
rocksDBServerConfig,
blobTransferEnabled);
rocksDBServerConfig);
}

/**
Expand Down Expand Up @@ -484,16 +484,12 @@ public synchronized void endBatchWrite() {
* the last SST file written is finished.
*/
rocksDBSstFileWriter.ingestSSTFiles(rocksDB, columnFamilyHandleList);

if (blobTransferEnabled) {
createSnapshot();
}
}

@Override
public synchronized void createSnapshot() {
if (blobTransferEnabled) {
rocksDBSstFileWriter.createSnapshot(rocksDB);
BlobSnapshotManager.createSnapshotForBatch(rocksDB, fullPathForPartitionDBSnapshot);
}
}

Expand Down Expand Up @@ -833,7 +829,7 @@ public synchronized void drop() {
// Remove extra SST files first
deleteFilesInDirectory(fullPathForTempSSTFileDir);
// remove snapshots files
deleteFilesInDirectory(fullPathForTempSnapshotFileDir);
deleteFilesInDirectory(fullPathForPartitionDBSnapshot);
// Remove partition directory
deleteDirectory(fullPathForPartitionDB);
LOGGER.info("RocksDB for replica:{} was dropped.", replicaId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,21 @@

import static org.mockito.Mockito.*;

import com.linkedin.venice.exceptions.VeniceException;
import com.linkedin.venice.meta.ReadOnlyStoreRepository;
import com.linkedin.venice.meta.Store;
import com.linkedin.venice.store.rocksdb.RocksDBUtils;
import com.linkedin.venice.utils.Utils;
import com.linkedin.venice.utils.concurrent.VeniceConcurrentHashMap;
import java.io.File;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.commons.io.FileUtils;
import org.mockito.MockedStatic;
import org.mockito.Mockito;
import org.rocksdb.Checkpoint;
import org.rocksdb.RocksDB;
import org.rocksdb.RocksDBException;
Expand Down Expand Up @@ -140,4 +146,72 @@ public void testSnapshotNotUpdatedWhenNotHybrid() throws RocksDBException {
blobSnapshotManager.maybeUpdateHybridSnapshot(mockRocksDB, STORE_NAME, PARTITION_ID);
verify(mockCheckpoint, times(0)).createCheckpoint(DB_DIR + "/.snapshot_files");
}

@Test
public void testCreateSnapshotForBatch() throws RocksDBException {
try (MockedStatic<Checkpoint> checkpointMockedStatic = Mockito.mockStatic(Checkpoint.class)) {
try (MockedStatic<FileUtils> fileUtilsMockedStatic = Mockito.mockStatic(FileUtils.class)) {
// test prepare
RocksDB mockRocksDB = mock(RocksDB.class);
Checkpoint mockCheckpoint = mock(Checkpoint.class);
checkpointMockedStatic.when(() -> Checkpoint.create(mockRocksDB)).thenReturn(mockCheckpoint);
String fullSnapshotPath = DB_DIR + "/.snapshot_files";
File file = spy(new File(fullSnapshotPath));
doNothing().when(mockCheckpoint).createCheckpoint(fullSnapshotPath);

// case 1: snapshot file not exists
// test execute
BlobSnapshotManager.createSnapshotForBatch(mockRocksDB, fullSnapshotPath);
// test verify
verify(mockCheckpoint, times(1)).createCheckpoint(fullSnapshotPath);
fileUtilsMockedStatic.verify(() -> FileUtils.deleteDirectory(eq(file.getAbsoluteFile())), times(0));

// case 2: snapshot file exists
// test prepare
File fullSnapshotDir = new File(fullSnapshotPath);
if (!fullSnapshotDir.exists()) {
fullSnapshotDir.mkdirs();
}
// test execute
BlobSnapshotManager.createSnapshotForBatch(mockRocksDB, fullSnapshotPath);
// test verify
verify(mockCheckpoint, times(2)).createCheckpoint(fullSnapshotPath);
fileUtilsMockedStatic.verify(() -> FileUtils.deleteDirectory(eq(file.getAbsoluteFile())), times(1));

// case 3: delete snapshot file fail
// test prepare
fileUtilsMockedStatic.when(() -> FileUtils.deleteDirectory(any(File.class)))
.thenThrow(new IOException("Delete snapshot file failed."));
// test execute
try {
BlobSnapshotManager.createSnapshotForBatch(mockRocksDB, fullSnapshotPath);
Assert.fail("Should throw exception");
} catch (VeniceException e) {
// test verify
verify(mockCheckpoint, times(2)).createCheckpoint(fullSnapshotPath);
fileUtilsMockedStatic.verify(() -> FileUtils.deleteDirectory(eq(file.getAbsoluteFile())), times(2));
Assert.assertEquals(e.getMessage(), "Failed to delete the existing snapshot directory: " + fullSnapshotPath);
}

// case 4: create createCheckpoint failed
// test prepare
fullSnapshotDir.delete();
fileUtilsMockedStatic.reset();
doThrow(new RocksDBException("Create checkpoint failed.")).when(mockCheckpoint)
.createCheckpoint(fullSnapshotPath);
// test execute
try {
BlobSnapshotManager.createSnapshotForBatch(mockRocksDB, fullSnapshotPath);
Assert.fail("Should throw exception");
} catch (VeniceException e) {
// test verify
verify(mockCheckpoint, times(3)).createCheckpoint(fullSnapshotPath);
fileUtilsMockedStatic.verify(() -> FileUtils.deleteDirectory(eq(file.getAbsoluteFile())), times(0));
Assert.assertEquals(
e.getMessage(),
"Received exception during RocksDB's snapshot creation in directory " + fullSnapshotPath);
}
}
}
}
}
Loading

0 comments on commit 7e66fd3

Please sign in to comment.