diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java index cdf2a61fe96..47dd64f49d8 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java @@ -513,13 +513,16 @@ public void removeSegment(IndexSegment segment) { _logger.info("Skip removing untracked (replaced or empty) segment: {}", segmentName); return; } - // Skip removing segment that has max comparison value smaller than (largestSeenComparisonValue - TTL) + // Skip removing the upsert metadata of segment that has max comparison value smaller than + // (largestSeenComparisonValue - TTL), i.e. out of metadata TTL. The expired metadata is removed while creating + // new consuming segment in batches. + boolean skipRemoveMetadata = false; if (_metadataTTL > 0 && _largestSeenComparisonValue.get() > 0) { Number maxComparisonValue = (Number) segment.getSegmentMetadata().getColumnMetadataMap().get(_comparisonColumns.get(0)).getMaxValue(); if (maxComparisonValue.doubleValue() < _largestSeenComparisonValue.get() - _metadataTTL) { _logger.info("Skip removing segment: {} because it's out of TTL", segmentName); - return; + skipRemoveMetadata = true; } } if (!startOperation()) { @@ -530,7 +533,9 @@ public void removeSegment(IndexSegment segment) { _snapshotLock.readLock().lock(); } try { - doRemoveSegment(segment); + if (!skipRemoveMetadata) { + doRemoveSegment(segment); + } _trackedSegments.remove(segment); } finally { if (_enableSnapshot) { @@ -708,8 +713,7 @@ protected void doTakeSnapshot() { ServerGauge.UPSERT_VALID_DOC_ID_SNAPSHOT_COUNT, numImmutableSegments); _serverMetrics.setValueOfPartitionGauge(_tableNameWithType, _partitionId, ServerGauge.UPSERT_PRIMARY_KEYS_IN_SNAPSHOT_COUNT, numPrimaryKeysInSnapshot); - _logger.info( - "Finished taking snapshot for {} immutable segments with {} primary keys (out of {} total segments, " + _logger.info("Finished taking snapshot for {} immutable segments with {} primary keys (out of {} total segments, " + "{} are consuming segments) in {} ms", numImmutableSegments, numPrimaryKeysInSnapshot, numTrackedSegments, numConsumingSegments, System.currentTimeMillis() - startTimeMs); }