Skip to content

Commit

Permalink
Always use split commit on server (#11680)
Browse files Browse the repository at this point in the history
  • Loading branch information
Jackie-Jiang authored Sep 26, 2023
1 parent 9ed9f21 commit ff264a3
Show file tree
Hide file tree
Showing 21 changed files with 92 additions and 297 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,8 @@ public String segmentCommitStart(@QueryParam(SegmentCompletionProtocol.PARAM_INS
return responseStr;
}

// Remove after releasing 1.1 (server always use split commit)
@Deprecated
@POST
@Path(SegmentCompletionProtocol.MSG_TYPE_COMMIT)
@Authorize(targetType = TargetType.CLUSTER, action = Actions.Cluster.COMMIT_SEGMENT)
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@
* Highlights:
* 1-An object of this class is hosted by each RealtimeTableDataManager.
* 2-The object tracks ingestion delays for all partitions hosted by the current server for the given Realtime table.
* 3-Partition delays are updated by all LLRealtimeSegmentDataManager objects hosted in the corresponding
* 3-Partition delays are updated by all RealtimeSegmentDataManager objects hosted in the corresponding
* RealtimeTableDataManager.
* 4-Individual metrics are associated with each partition being tracked.
* 5-Delays for partitions that do not have events to consume are reported as zero.
Expand All @@ -58,12 +58,12 @@
* (CONSUMING -> ONLINE state change)
* |
* markPartitionForConfirmation(partitionId)
* | |<-updateIngestionDelay()-{LLRealtimeSegmentDataManager(Partition 0}}
* | |<-updateIngestionDelay()-{RealtimeSegmentDataManager(Partition 0}}
* | |
* ___________V_________________________V_
* | (Table X) |<-updateIngestionDelay()-{LLRealtimeSegmentDataManager(Partition 1}}
* | (Table X) |<-updateIngestionDelay()-{RealtimeSegmentDataManager(Partition 1}}
* | IngestionDelayTracker | ...
* |____________________________________|<-updateIngestionDelay()-{LLRealtimeSegmentDataManager (Partition n}}
* |____________________________________|<-updateIngestionDelay()-{RealtimeSegmentDataManager (Partition n}}
* ^ ^
* | \
* timeoutInactivePartitions() stopTrackingPartitionIngestionDelay(partitionId)
Expand Down Expand Up @@ -202,7 +202,7 @@ void setClock(Clock clock) {
}

/*
* Called by LLRealTimeSegmentDataManagers to post ingestion time updates to this tracker class.
* Called by RealTimeSegmentDataManagers to post ingestion time updates to this tracker class.
*
* @param ingestionTimeMs ingestion time being recorded.
* @param firstStreamIngestionTimeMs time the event was ingested in the first stage of the ingestion pipeline.
Expand Down Expand Up @@ -332,8 +332,8 @@ public long getPartitionEndToEndIngestionDelayMs(int partitionGroupId) {
}

/*
* We use this method to clean up when a table is being removed. No updates are expected at this time
* as all LLRealtimeSegmentManagers should be down now.
* We use this method to clean up when a table is being removed. No updates are expected at this time as all
* RealtimeSegmentManagers should be down now.
*/
public void shutdown() {
// Now that segments can't report metric, destroy metric for this table
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,11 @@
import org.apache.pinot.common.utils.LLCSegmentName;
import org.apache.pinot.common.utils.TarGzCompressionUtils;
import org.apache.pinot.core.data.manager.realtime.RealtimeConsumptionRateManager.ConsumptionRateLimiter;
import org.apache.pinot.segment.local.data.manager.SegmentDataManager;
import org.apache.pinot.segment.local.dedup.PartitionDedupMetadataManager;
import org.apache.pinot.segment.local.indexsegment.mutable.MutableSegmentImpl;
import org.apache.pinot.segment.local.io.writer.impl.DirectMemoryManager;
import org.apache.pinot.segment.local.io.writer.impl.MmapMemoryManager;
import org.apache.pinot.segment.local.realtime.converter.ColumnIndicesForRealtimeTable;
import org.apache.pinot.segment.local.realtime.converter.RealtimeSegmentConverter;
import org.apache.pinot.segment.local.realtime.impl.RealtimeSegmentConfig;
Expand Down Expand Up @@ -104,8 +107,10 @@
/**
* Segment data manager for low level consumer realtime segments, which manages consumption and segment completion.
*/
public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
protected enum State {
public class LLRealtimeSegmentDataManager extends SegmentDataManager {

@VisibleForTesting
public enum State {
// The state machine starts off with this state. While in this state we consume stream events
// and index them in memory. We continue to be in this state until the end criteria is satisfied
// (time or number of rows)
Expand Down Expand Up @@ -154,8 +159,6 @@ public boolean isFinal() {
}
}

private static final int MINIMUM_CONSUME_TIME_MINUTES = 10;

@VisibleForTesting
public class SegmentBuildDescriptor {
final File _segmentTarFile;
Expand Down Expand Up @@ -208,6 +211,9 @@ public void deleteSegmentFile() {
}
}

public static final String RESOURCE_TEMP_DIR_NAME = "_tmp";

private static final int MINIMUM_CONSUME_TIME_MINUTES = 10;
private static final long TIME_THRESHOLD_FOR_LOG_MINUTES = 1;
private static final long TIME_EXTENSION_ON_EMPTY_SEGMENT_HOURS = 1;
private static final int MSG_COUNT_THRESHOLD_FOR_LOG = 100000;
Expand Down Expand Up @@ -757,7 +763,7 @@ public void run() {
_state = State.ERROR;
_segmentLogger.error("Could not build segment for {}", _segmentNameStr);
} else {
success = commitSegment(response.getControllerVipUrl(), _indexLoadingConfig.isEnableSplitCommit());
success = commitSegment(response.getControllerVipUrl());
if (success) {
_state = State.COMMITTED;
} else {
Expand Down Expand Up @@ -839,31 +845,39 @@ protected void buildSegmentForCommit(long buildTimeLeaseMs) {
}
}

@Override
/**
* Returns the current offset for the partition group.
*/
public Map<String, String> getPartitionToCurrentOffset() {
Map<String, String> partitionToCurrentOffset = new HashMap<>();
partitionToCurrentOffset.put(String.valueOf(_partitionGroupId), _currentOffset.toString());
return partitionToCurrentOffset;
return Collections.singletonMap(String.valueOf(_partitionGroupId), _currentOffset.toString());
}

@Override
/**
* Returns the state of the consumer.
*/
public ConsumerState getConsumerState() {
return _state == State.ERROR ? ConsumerState.NOT_CONSUMING : ConsumerState.CONSUMING;
}

@Override
/**
* Returns the timestamp of the last consumed message.
*/
public long getLastConsumedTimestamp() {
return _lastConsumedTimestampMs;
}

@Override
/**
* Returns the {@link ConsumerPartitionState} for the partition group.
*/
public Map<String, ConsumerPartitionState> getConsumerPartitionState() {
String partitionGroupId = String.valueOf(_partitionGroupId);
return Collections.singletonMap(partitionGroupId, new ConsumerPartitionState(partitionGroupId, getCurrentOffset(),
getLastConsumedTimestamp(), fetchLatestStreamOffset(5_000), _lastRowMetadata));
}

@Override
/**
* Returns the {@link PartitionLagState} for the partition group.
*/
public Map<String, PartitionLagState> getPartitionToLagState(
Map<String, ConsumerPartitionState> consumerPartitionStateMap) {
if (_partitionMetadataProvider == null) {
Expand All @@ -881,21 +895,22 @@ public StreamPartitionMsgOffset getLatestStreamOffsetAtStartupTime() {
}

@VisibleForTesting
protected SegmentBuildDescriptor getSegmentBuildDescriptor() {
SegmentBuildDescriptor getSegmentBuildDescriptor() {
return _segmentBuildDescriptor;
}

@VisibleForTesting
protected Semaphore getPartitionGroupConsumerSemaphore() {
Semaphore getPartitionGroupConsumerSemaphore() {
return _partitionGroupConsumerSemaphore;
}

@VisibleForTesting
protected AtomicBoolean getAcquiredConsumerSemaphore() {
AtomicBoolean getAcquiredConsumerSemaphore() {
return _acquiredConsumerSemaphore;
}

protected SegmentBuildDescriptor buildSegmentInternal(boolean forCommit) {
@VisibleForTesting
SegmentBuildDescriptor buildSegmentInternal(boolean forCommit) {
closeStreamConsumers();
// Do not allow building segment when table data manager is already shut down
if (_realtimeTableDataManager.isShutDown()) {
Expand Down Expand Up @@ -1021,25 +1036,24 @@ protected SegmentBuildDescriptor buildSegmentInternal(boolean forCommit) {
}
}

protected boolean commitSegment(String controllerVipUrl, boolean isSplitCommit) {
@VisibleForTesting
boolean commitSegment(String controllerVipUrl) {
File segmentTarFile = _segmentBuildDescriptor.getSegmentTarFile();
if (segmentTarFile == null || !segmentTarFile.exists()) {
throw new RuntimeException("Segment file does not exist: " + segmentTarFile);
}
SegmentCompletionProtocol.Response commitResponse = commit(controllerVipUrl, isSplitCommit);

if (!commitResponse.getStatus().equals(SegmentCompletionProtocol.ControllerResponseStatus.COMMIT_SUCCESS)) {
Preconditions.checkState(segmentTarFile != null && segmentTarFile.exists(), "Segment tar file: %s does not exist",
segmentTarFile);
SegmentCompletionProtocol.Response commitResponse = commit(controllerVipUrl);
if (commitResponse.getStatus() != SegmentCompletionProtocol.ControllerResponseStatus.COMMIT_SUCCESS) {
_segmentLogger.warn("Controller response was {} and not {}", commitResponse.getStatus(),
SegmentCompletionProtocol.ControllerResponseStatus.COMMIT_SUCCESS);
return false;
}

_realtimeTableDataManager.replaceLLSegment(_segmentNameStr, _indexLoadingConfig);
removeSegmentFile();
return true;
}

protected SegmentCompletionProtocol.Response commit(String controllerVipUrl, boolean isSplitCommit) {
@VisibleForTesting
SegmentCompletionProtocol.Response commit(String controllerVipUrl) {
SegmentCompletionProtocol.Request.Params params = new SegmentCompletionProtocol.Request.Params();

params.withSegmentName(_segmentNameStr).withStreamPartitionMsgOffset(_currentOffset.toString())
Expand All @@ -1053,7 +1067,7 @@ protected SegmentCompletionProtocol.Response commit(String controllerVipUrl, boo

SegmentCommitter segmentCommitter;
try {
segmentCommitter = _segmentCommitterFactory.createSegmentCommitter(isSplitCommit, params, controllerVipUrl);
segmentCommitter = _segmentCommitterFactory.createSegmentCommitter(params, controllerVipUrl);
} catch (URISyntaxException e) {
_segmentLogger.error("Failed to create a segment committer: ", e);
return SegmentCompletionProtocol.RESP_NOT_SENT;
Expand Down Expand Up @@ -1274,7 +1288,6 @@ protected void doDestroy() {
cleanupMetrics();
}

@Override
public void startConsumption() {
_consumerThread = new Thread(new PartitionConsumer(), _segmentNameStr);
_segmentLogger.info("Created new consumer thread {} for {}", _consumerThread, this);
Expand Down Expand Up @@ -1346,9 +1359,14 @@ public LLRealtimeSegmentDataManager(SegmentZKMetadata segmentZKMetadata, TableCo
_clientId = _tableNameWithType + "-" + streamTopic + "-" + _partitionGroupId;
_segmentLogger = LoggerFactory.getLogger(LLRealtimeSegmentDataManager.class.getName() + "_" + _segmentNameStr);
_tableStreamName = _tableNameWithType + "_" + streamTopic;
_memoryManager = getMemoryManager(realtimeTableDataManager.getConsumerDir(), _segmentNameStr,
indexLoadingConfig.isRealtimeOffHeapAllocation(), indexLoadingConfig.isDirectRealtimeOffHeapAllocation(),
serverMetrics);
if (_indexLoadingConfig.isRealtimeOffHeapAllocation() && !_indexLoadingConfig.isDirectRealtimeOffHeapAllocation()) {
_memoryManager =
new MmapMemoryManager(_realtimeTableDataManager.getConsumerDir(), _segmentNameStr, _serverMetrics);
} else {
// For on-heap allocation, we still need a memory manager for forward index.
// Dictionary will be allocated on heap.
_memoryManager = new DirectMemoryManager(_segmentNameStr, _serverMetrics);
}

_rateLimiter = RealtimeConsumptionRateManager.getInstance()
.createRateLimiter(_streamConfig, _tableNameWithType, _serverMetrics, _clientId);
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,7 @@ protected void doShutdown() {
}

/*
* Method used by LLRealtimeSegmentManagers to update their partition delays
* Method used by RealtimeSegmentManagers to update their partition delays
*
* @param ingestionTimeMs Ingestion delay being reported.
* @param partitionGroupId Partition ID for which delay is being updated.
Expand Down
Loading

0 comments on commit ff264a3

Please sign in to comment.