Skip to content

Commit

Permalink
allow to preload segments with upsert snapshots to speedup table load…
Browse files Browse the repository at this point in the history
…ing (apache#11020)
  • Loading branch information
klsince authored Jul 11, 2023
1 parent 19dce0e commit 4bb7e6a
Show file tree
Hide file tree
Showing 23 changed files with 626 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
Expand Down Expand Up @@ -99,6 +100,7 @@ public abstract class BaseTableDataManager implements TableDataManager {
protected File _resourceTmpDir;
protected Logger _logger;
protected HelixManager _helixManager;
protected ExecutorService _segmentPreloadExecutor;
protected AuthProvider _authProvider;
protected long _streamSegmentDownloadUntarRateLimitBytesPerSec;
protected boolean _isStreamSegmentDownloadUntar;
Expand All @@ -114,6 +116,7 @@ public abstract class BaseTableDataManager implements TableDataManager {
@Override
public void init(TableDataManagerConfig tableDataManagerConfig, String instanceId,
ZkHelixPropertyStore<ZNRecord> propertyStore, ServerMetrics serverMetrics, HelixManager helixManager,
@Nullable ExecutorService segmentPreloadExecutor,
@Nullable LoadingCache<Pair<String, String>, SegmentErrorInfo> errorCache,
TableDataManagerParams tableDataManagerParams) {
LOGGER.info("Initializing table data manager for table: {}", tableDataManagerConfig.getTableName());
Expand All @@ -123,6 +126,7 @@ public void init(TableDataManagerConfig tableDataManagerConfig, String instanceI
_propertyStore = propertyStore;
_serverMetrics = serverMetrics;
_helixManager = helixManager;
_segmentPreloadExecutor = segmentPreloadExecutor;

_authProvider =
AuthProviderUtils.extractAuthProvider(toPinotConfiguration(_tableDataManagerConfig.getAuthConfig()), null);
Expand Down Expand Up @@ -681,8 +685,8 @@ File getSegmentDataDir(String segmentName) {
return new File(_indexDir, segmentName);
}

@VisibleForTesting
File getSegmentDataDir(String segmentName, @Nullable String segmentTier, TableConfig tableConfig) {
@Override
public File getSegmentDataDir(String segmentName, @Nullable String segmentTier, TableConfig tableConfig) {
if (segmentTier == null) {
return getSegmentDataDir(segmentName);
}
Expand Down Expand Up @@ -763,7 +767,8 @@ private void removeBackup(File indexDir)
* object may be created when trying to load the segment, but it's closed if the method
* returns false; otherwise it's opened and to be referred by ImmutableSegment object.
*/
protected boolean tryLoadExistingSegment(String segmentName, IndexLoadingConfig indexLoadingConfig,
@Override
public boolean tryLoadExistingSegment(String segmentName, IndexLoadingConfig indexLoadingConfig,
SegmentZKMetadata zkMetadata) {
// Try to recover the segment from potential segment reloading failure.
String segmentTier = zkMetadata.getTier();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,10 @@

import com.google.common.cache.LoadingCache;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Semaphore;
import java.util.function.Supplier;
import javax.annotation.Nullable;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.helix.HelixManager;
Expand Down Expand Up @@ -60,13 +62,14 @@ public static void init(InstanceDataManagerConfig instanceDataManagerConfig) {
public static TableDataManager getTableDataManager(TableDataManagerConfig tableDataManagerConfig, String instanceId,
ZkHelixPropertyStore<ZNRecord> propertyStore, ServerMetrics serverMetrics, HelixManager helixManager,
LoadingCache<Pair<String, String>, SegmentErrorInfo> errorCache) {
return getTableDataManager(tableDataManagerConfig, instanceId, propertyStore, serverMetrics, helixManager,
return getTableDataManager(tableDataManagerConfig, instanceId, propertyStore, serverMetrics, helixManager, null,
errorCache, () -> true);
}

public static TableDataManager getTableDataManager(TableDataManagerConfig tableDataManagerConfig, String instanceId,
ZkHelixPropertyStore<ZNRecord> propertyStore, ServerMetrics serverMetrics, HelixManager helixManager,
LoadingCache<Pair<String, String>, SegmentErrorInfo> errorCache, Supplier<Boolean> isServerReadyToServeQueries) {
@Nullable ExecutorService segmentPreloadExecutor, LoadingCache<Pair<String, String>, SegmentErrorInfo> errorCache,
Supplier<Boolean> isServerReadyToServeQueries) {
TableDataManager tableDataManager;
switch (tableDataManagerConfig.getTableType()) {
case OFFLINE:
Expand All @@ -90,8 +93,8 @@ public static TableDataManager getTableDataManager(TableDataManagerConfig tableD
default:
throw new IllegalStateException();
}
tableDataManager.init(tableDataManagerConfig, instanceId, propertyStore, serverMetrics, helixManager, errorCache,
_tableDataManagerParams);
tableDataManager.init(tableDataManagerConfig, instanceId, propertyStore, serverMetrics, helixManager,
segmentPreloadExecutor, errorCache, _tableDataManagerParams);
return tableDataManager;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ public class RealtimeTableDataManager extends BaseTableDataManager {

private TableDedupMetadataManager _tableDedupMetadataManager;
private TableUpsertMetadataManager _tableUpsertMetadataManager;
private boolean _isUpsertEnabled;
private BooleanSupplier _isTableReadyToConsumeData;

public RealtimeTableDataManager(Semaphore segmentBuildSemaphore) {
Expand Down Expand Up @@ -205,7 +206,12 @@ protected void doInit() {
_tableUpsertMetadataManager);
Schema schema = ZKMetadataProvider.getTableSchema(_propertyStore, _tableNameWithType);
Preconditions.checkState(schema != null, "Failed to find schema for table: %s", _tableNameWithType);
_tableUpsertMetadataManager = TableUpsertMetadataManagerFactory.create(tableConfig, schema, this, _serverMetrics);
// While creating _tableUpsertMetadataManager object, some methods want to check if upsert is enabled, so track
// this status with a boolean, instead of relying on if _tableUpsertMetadataManager is null or not.
_isUpsertEnabled = true;
_tableUpsertMetadataManager =
TableUpsertMetadataManagerFactory.create(tableConfig, schema, this, _serverMetrics, _helixManager,
_segmentPreloadExecutor);
}

// For dedup and partial-upsert, need to wait for all segments loaded before starting consuming data
Expand Down Expand Up @@ -352,7 +358,7 @@ public boolean isDedupEnabled() {
}

public boolean isUpsertEnabled() {
return _tableUpsertMetadataManager != null;
return _isUpsertEnabled;
}

public boolean isPartialUpsertEnabled() {
Expand Down Expand Up @@ -533,7 +539,11 @@ private void handleUpsert(ImmutableSegment immutableSegment) {
ImmutableSegmentDataManager newSegmentManager = new ImmutableSegmentDataManager(immutableSegment);
SegmentDataManager oldSegmentManager = registerSegment(segmentName, newSegmentManager);
if (oldSegmentManager == null) {
partitionUpsertMetadataManager.addSegment(immutableSegment);
if (_tableUpsertMetadataManager.isPreloading()) {
partitionUpsertMetadataManager.preloadSegment(immutableSegment);
} else {
partitionUpsertMetadataManager.addSegment(immutableSegment);
}
_logger.info("Added new immutable segment: {} to upsert-enabled table: {}", segmentName, _tableNameWithType);
} else {
IndexSegment oldSegment = oldSegmentManager.getSegment();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ private TableDataManager makeTestableManager()
when(config.getTableDeletedSegmentsCacheTtlMinutes()).thenReturn(DELETED_SEGMENTS_TTL_MINUTES);
}
tableDataManager.init(config, "dummyInstance", mock(ZkHelixPropertyStore.class),
new ServerMetrics(PinotMetricUtils.getPinotMetricsRegistry()), mock(HelixManager.class), null,
new ServerMetrics(PinotMetricUtils.getPinotMetricsRegistry()), mock(HelixManager.class), null, null,
new TableDataManagerParams(0, false, -1));
tableDataManager.start();
Field segsMapField = BaseTableDataManager.class.getDeclaredField("_segmentDataManagerMap");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -719,7 +719,7 @@ private static BaseTableDataManager createTableManager() {

OfflineTableDataManager tableDataManager = new OfflineTableDataManager();
tableDataManager.init(config, "dummyInstance", mock(ZkHelixPropertyStore.class),
new ServerMetrics(PinotMetricUtils.getPinotMetricsRegistry()), mock(HelixManager.class), null,
new ServerMetrics(PinotMetricUtils.getPinotMetricsRegistry()), mock(HelixManager.class), null, null,
new TableDataManagerParams(0, false, -1));
tableDataManager.start();
return tableDataManager;
Expand All @@ -728,7 +728,7 @@ private static BaseTableDataManager createTableManager() {
private static BaseTableDataManager createTableManager(TableDataManagerConfig config, HelixManager helixManager) {
OfflineTableDataManager tableDataManager = new OfflineTableDataManager();
tableDataManager.init(config, "dummyInstance", mock(ZkHelixPropertyStore.class),
new ServerMetrics(PinotMetricUtils.getPinotMetricsRegistry()), helixManager, null,
new ServerMetrics(PinotMetricUtils.getPinotMetricsRegistry()), helixManager, null, null,
new TableDataManagerParams(0, false, -1));
tableDataManager.start();
return tableDataManager;
Expand All @@ -737,7 +737,7 @@ private static BaseTableDataManager createTableManager(TableDataManagerConfig co
private static OfflineTableDataManager createSpyOfflineTableManager(TableDataManagerConfig tableDataManagerConfig) {
OfflineTableDataManager tableDataManager = new OfflineTableDataManager();
tableDataManager.init(tableDataManagerConfig, "dummyInstance", mock(ZkHelixPropertyStore.class),
new ServerMetrics(PinotMetricUtils.getPinotMetricsRegistry()), mock(HelixManager.class), null,
new ServerMetrics(PinotMetricUtils.getPinotMetricsRegistry()), mock(HelixManager.class), null, null,
new TableDataManagerParams(0, false, -1));
tableDataManager.start();
return Mockito.spy(tableDataManager);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ private DimensionTableDataManager makeTableDataManager(HelixManager helixManager
when(config.getDataDir()).thenReturn(TEMP_DIR.getAbsolutePath());
}
tableDataManager.init(config, "dummyInstance", helixManager.getHelixPropertyStore(),
new ServerMetrics(PinotMetricUtils.getPinotMetricsRegistry()), helixManager, null,
new ServerMetrics(PinotMetricUtils.getPinotMetricsRegistry()), helixManager, null, null,
new TableDataManagerParams(0, false, -1));
tableDataManager.start();
return tableDataManager;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ public void testAddSegmentUseBackupCopy()
TableConfig tableConfig = setupTableConfig(propertyStore);
Schema schema = setupSchema(propertyStore);
tmgr.init(tableDataManagerConfig, "server01", propertyStore,
new ServerMetrics(PinotMetricUtils.getPinotMetricsRegistry()), mock(HelixManager.class), null,
new ServerMetrics(PinotMetricUtils.getPinotMetricsRegistry()), mock(HelixManager.class), null, null,
new TableDataManagerParams(0, false, -1));

// Create a dummy local segment.
Expand Down Expand Up @@ -135,7 +135,7 @@ public void testAddSegmentNoBackupCopy()
TableConfig tableConfig = setupTableConfig(propertyStore);
Schema schema = setupSchema(propertyStore);
tmgr.init(tableDataManagerConfig, "server01", propertyStore,
new ServerMetrics(PinotMetricUtils.getPinotMetricsRegistry()), mock(HelixManager.class), null,
new ServerMetrics(PinotMetricUtils.getPinotMetricsRegistry()), mock(HelixManager.class), null, null,
new TableDataManagerParams(0, false, -1));

// Create a raw segment and put it in deep store backed by local fs.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.io.File;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import javax.annotation.Nullable;
import javax.annotation.concurrent.ThreadSafe;
import org.apache.commons.lang3.tuple.Pair;
Expand All @@ -34,6 +35,7 @@
import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig;
import org.apache.pinot.segment.spi.ImmutableSegment;
import org.apache.pinot.segment.spi.SegmentMetadata;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.data.Schema;


Expand All @@ -48,7 +50,9 @@ public interface TableDataManager {
*/
void init(TableDataManagerConfig tableDataManagerConfig, String instanceId,
ZkHelixPropertyStore<ZNRecord> propertyStore, ServerMetrics serverMetrics, HelixManager helixManager,
LoadingCache<Pair<String, String>, SegmentErrorInfo> errorCache, TableDataManagerParams tableDataManagerParams);
@Nullable ExecutorService segmentPreloadExecutor,
@Nullable LoadingCache<Pair<String, String>, SegmentErrorInfo> errorCache,
TableDataManagerParams tableDataManagerParams);

/**
* Starts the table data manager. Should be called only once after table data manager gets initialized but before
Expand Down Expand Up @@ -126,6 +130,21 @@ void addOrReplaceSegment(String segmentName, IndexLoadingConfig indexLoadingConf
*/
void removeSegment(String segmentName);

/**
* Try to load a segment from an existing segment directory managed by the server. The segment loading may fail
* because the directory may not exist any more, or the segment data has a different crc now, or the existing segment
* data got corrupted etc.
*
* @return true if the segment is loaded successfully from the existing segment directory; false otherwise.
*/
boolean tryLoadExistingSegment(String segmentName, IndexLoadingConfig indexLoadingConfig,
SegmentZKMetadata zkMetadata);

/**
* Get the segment data directory, considering the segment tier if provided.
*/
File getSegmentDataDir(String segmentName, @Nullable String segmentTier, TableConfig tableConfig);

/**
* Returns true if the segment was deleted in the last few minutes.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,13 +173,76 @@ protected void doAddSegment(ImmutableSegmentImpl segment) {
System.currentTimeMillis() - startTimeMs, numPrimaryKeys);
}

@Override
public void preloadSegment(ImmutableSegment segment) {
String segmentName = segment.getSegmentName();
if (_stopped) {
_logger.info("Skip preloading segment: {} because metadata manager is already stopped", segmentName);
return;
}
Preconditions.checkArgument(_enableSnapshot, "Snapshot must be enabled to preload segment: {}, table: {}",
segmentName, _tableNameWithType);
// Note that EmptyIndexSegment should not reach here either, as it doesn't have validDocIds snapshot.
Preconditions.checkArgument(segment instanceof ImmutableSegmentImpl,
"Got unsupported segment implementation: {} for segment: {}, table: {}", segment.getClass(), segmentName,
_tableNameWithType);
_snapshotLock.readLock().lock();
startOperation();
try {
doPreloadSegment((ImmutableSegmentImpl) segment);
_trackedSegments.add(segment);
} finally {
finishOperation();
_snapshotLock.readLock().unlock();
}
}

private void doPreloadSegment(ImmutableSegmentImpl segment) {
String segmentName = segment.getSegmentName();
_logger.info("Preloading segment: {}, current primary key count: {}", segmentName, getNumPrimaryKeys());
long startTimeMs = System.currentTimeMillis();

MutableRoaringBitmap validDocIds = segment.loadValidDocIdsFromSnapshot();
Preconditions.checkState(validDocIds != null,
"Snapshot of validDocIds is required to preload segment: {}, table: {}", segmentName, _tableNameWithType);
if (validDocIds.isEmpty()) {
_logger.info("Skip preloading segment: {} without valid doc, current primary key count: {}",
segment.getSegmentName(), getNumPrimaryKeys());
segment.enableUpsert(this, new ThreadSafeMutableRoaringBitmap(), null);
return;
}

try (UpsertUtils.RecordInfoReader recordInfoReader = new UpsertUtils.RecordInfoReader(segment, _primaryKeyColumns,
_comparisonColumns, _deleteRecordColumn)) {
addSegment(segment, null, null, UpsertUtils.getRecordInfoIterator(recordInfoReader, validDocIds), true);
} catch (Exception e) {
throw new RuntimeException(
String.format("Caught exception while preloading segment: %s, table: %s", segmentName, _tableNameWithType),
e);
}

// Update metrics
long numPrimaryKeys = getNumPrimaryKeys();
_serverMetrics.setValueOfPartitionGauge(_tableNameWithType, _partitionId, ServerGauge.UPSERT_PRIMARY_KEYS_COUNT,
numPrimaryKeys);
_logger.info("Finished preloading segment: {} in {}ms, current primary key count: {}", segmentName,
System.currentTimeMillis() - startTimeMs, numPrimaryKeys);
}

/**
* NOTE: We allow passing in validDocIds and queryableDocIds here so that the value can be easily accessed from the
* tests. The passed in bitmaps should always be empty.
*/
@VisibleForTesting
public void addSegment(ImmutableSegmentImpl segment, @Nullable ThreadSafeMutableRoaringBitmap validDocIds,
@Nullable ThreadSafeMutableRoaringBitmap queryableDocIds, Iterator<RecordInfo> recordInfoIterator) {
addSegment(segment, validDocIds, queryableDocIds, recordInfoIterator, false);
}

@VisibleForTesting
public void addSegment(ImmutableSegmentImpl segment, @Nullable ThreadSafeMutableRoaringBitmap validDocIds,
@Nullable ThreadSafeMutableRoaringBitmap queryableDocIds, Iterator<RecordInfo> recordInfoIterator,
boolean isPreloading) {
String segmentName = segment.getSegmentName();
Lock segmentLock = SegmentLocks.getSegmentLock(_tableNameWithType, segmentName);
segmentLock.lock();
Expand All @@ -190,7 +253,11 @@ public void addSegment(ImmutableSegmentImpl segment, @Nullable ThreadSafeMutable
if (queryableDocIds == null && _deleteRecordColumn != null) {
queryableDocIds = new ThreadSafeMutableRoaringBitmap();
}
addOrReplaceSegment(segment, validDocIds, queryableDocIds, recordInfoIterator, null, null);
if (isPreloading) {
addSegmentWithoutUpsert(segment, validDocIds, queryableDocIds, recordInfoIterator);
} else {
addOrReplaceSegment(segment, validDocIds, queryableDocIds, recordInfoIterator, null, null);
}
} finally {
segmentLock.unlock();
}
Expand All @@ -202,6 +269,11 @@ protected abstract void addOrReplaceSegment(ImmutableSegmentImpl segment, Thread
@Nullable ThreadSafeMutableRoaringBitmap queryableDocIds, Iterator<RecordInfo> recordInfoIterator,
@Nullable IndexSegment oldSegment, @Nullable MutableRoaringBitmap validDocIdsForOldSegment);

protected void addSegmentWithoutUpsert(ImmutableSegmentImpl segment, ThreadSafeMutableRoaringBitmap validDocIds,
@Nullable ThreadSafeMutableRoaringBitmap queryableDocIds, Iterator<RecordInfo> recordInfoIterator) {
addOrReplaceSegment(segment, validDocIds, queryableDocIds, recordInfoIterator, null, null);
}

@Override
public void addRecord(MutableSegment segment, RecordInfo recordInfo) {
if (_stopped) {
Expand Down
Loading

0 comments on commit 4bb7e6a

Please sign in to comment.