From ff264a39990a456c7334fee7fbf388385c4553a5 Mon Sep 17 00:00:00 2001 From: "Xiaotian (Jackie) Jiang" <17555551+Jackie-Jiang@users.noreply.github.com> Date: Tue, 26 Sep 2023 12:11:14 -0700 Subject: [PATCH] Always use split commit on server (#11680) --- .../LLCSegmentCompletionHandlers.java | 2 + .../realtime/DefaultSegmentCommitter.java | 55 ------------- .../realtime/IngestionDelayTracker.java | 14 ++-- .../LLRealtimeSegmentDataManager.java | 80 ++++++++++++------- .../realtime/RealtimeSegmentDataManager.java | 79 ------------------ .../realtime/RealtimeTableDataManager.java | 2 +- .../realtime/SegmentCommitterFactory.java | 16 +--- .../LLRealtimeSegmentDataManagerTest.java | 16 ++-- .../realtime/SegmentCommitterFactoryTest.java | 19 +---- ...DecoderRealtimeClusterIntegrationTest.java | 15 +--- .../LLCRealtimeClusterIntegrationTest.java | 10 +-- ...loadLLCRealtimeClusterIntegrationTest.java | 10 +-- .../index/loader/IndexLoadingConfig.java | 15 +--- .../server/api/resources/DebugResource.java | 4 +- .../server/api/resources/TablesResource.java | 8 +- .../helix/HelixInstanceDataManager.java | 17 ++-- .../helix/HelixInstanceDataManagerConfig.java | 15 ---- ...ngestionBasedConsumptionStatusChecker.java | 2 +- ...SegmentOnlineOfflineStateModelFactory.java | 3 +- .../instance/InstanceDataManagerConfig.java | 4 - .../pinot/spi/utils/CommonConstants.java | 3 - 21 files changed, 92 insertions(+), 297 deletions(-) delete mode 100644 pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/DefaultSegmentCommitter.java delete mode 100644 pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/LLCSegmentCompletionHandlers.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/LLCSegmentCompletionHandlers.java index e774f914361..9ffd0a54160 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/LLCSegmentCompletionHandlers.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/LLCSegmentCompletionHandlers.java @@ -213,6 +213,8 @@ public String segmentCommitStart(@QueryParam(SegmentCompletionProtocol.PARAM_INS return responseStr; } + // Remove after releasing 1.1 (server always use split commit) + @Deprecated @POST @Path(SegmentCompletionProtocol.MSG_TYPE_COMMIT) @Authorize(targetType = TargetType.CLUSTER, action = Actions.Cluster.COMMIT_SEGMENT) diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/DefaultSegmentCommitter.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/DefaultSegmentCommitter.java deleted file mode 100644 index 036b01189fe..00000000000 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/DefaultSegmentCommitter.java +++ /dev/null @@ -1,55 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pinot.core.data.manager.realtime; - -import java.io.File; -import org.apache.pinot.common.protocols.SegmentCompletionProtocol; -import org.apache.pinot.server.realtime.ServerSegmentCompletionProtocolHandler; -import org.slf4j.Logger; - - -/** - * Sends segmentCommit() to the controller. - * If that succeeds, swap in-memory segment with the one built. - */ -public class DefaultSegmentCommitter implements SegmentCommitter { - private final SegmentCompletionProtocol.Request.Params _params; - private final ServerSegmentCompletionProtocolHandler _protocolHandler; - - private final Logger _segmentLogger; - - public DefaultSegmentCommitter(Logger segmentLogger, ServerSegmentCompletionProtocolHandler protocolHandler, - SegmentCompletionProtocol.Request.Params params) { - _segmentLogger = segmentLogger; - _protocolHandler = protocolHandler; - _params = params; - } - - @Override - public SegmentCompletionProtocol.Response commit( - LLRealtimeSegmentDataManager.SegmentBuildDescriptor segmentBuildDescriptor) { - File segmentTarFile = segmentBuildDescriptor.getSegmentTarFile(); - - SegmentCompletionProtocol.Response response = _protocolHandler.segmentCommit(_params, segmentTarFile); - if (!response.getStatus().equals(SegmentCompletionProtocol.ControllerResponseStatus.COMMIT_SUCCESS)) { - _segmentLogger.warn("Commit failed with response {}", response.toJsonString()); - } - return response; - } -} diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/IngestionDelayTracker.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/IngestionDelayTracker.java index b2eeae7bb63..f0ae4c24ab2 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/IngestionDelayTracker.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/IngestionDelayTracker.java @@ -41,7 +41,7 @@ * Highlights: * 1-An object of this class is hosted by each RealtimeTableDataManager. * 2-The object tracks ingestion delays for all partitions hosted by the current server for the given Realtime table. - * 3-Partition delays are updated by all LLRealtimeSegmentDataManager objects hosted in the corresponding + * 3-Partition delays are updated by all RealtimeSegmentDataManager objects hosted in the corresponding * RealtimeTableDataManager. * 4-Individual metrics are associated with each partition being tracked. * 5-Delays for partitions that do not have events to consume are reported as zero. @@ -58,12 +58,12 @@ * (CONSUMING -> ONLINE state change) * | * markPartitionForConfirmation(partitionId) - * | |<-updateIngestionDelay()-{LLRealtimeSegmentDataManager(Partition 0}} + * | |<-updateIngestionDelay()-{RealtimeSegmentDataManager(Partition 0}} * | | * ___________V_________________________V_ - * | (Table X) |<-updateIngestionDelay()-{LLRealtimeSegmentDataManager(Partition 1}} + * | (Table X) |<-updateIngestionDelay()-{RealtimeSegmentDataManager(Partition 1}} * | IngestionDelayTracker | ... - * |____________________________________|<-updateIngestionDelay()-{LLRealtimeSegmentDataManager (Partition n}} + * |____________________________________|<-updateIngestionDelay()-{RealtimeSegmentDataManager (Partition n}} * ^ ^ * | \ * timeoutInactivePartitions() stopTrackingPartitionIngestionDelay(partitionId) @@ -202,7 +202,7 @@ void setClock(Clock clock) { } /* - * Called by LLRealTimeSegmentDataManagers to post ingestion time updates to this tracker class. + * Called by RealTimeSegmentDataManagers to post ingestion time updates to this tracker class. * * @param ingestionTimeMs ingestion time being recorded. * @param firstStreamIngestionTimeMs time the event was ingested in the first stage of the ingestion pipeline. @@ -332,8 +332,8 @@ public long getPartitionEndToEndIngestionDelayMs(int partitionGroupId) { } /* - * We use this method to clean up when a table is being removed. No updates are expected at this time - * as all LLRealtimeSegmentManagers should be down now. + * We use this method to clean up when a table is being removed. No updates are expected at this time as all + * RealtimeSegmentManagers should be down now. */ public void shutdown() { // Now that segments can't report metric, destroy metric for this table diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java index 42e79e67dc7..97c1fbfd828 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java @@ -47,8 +47,11 @@ import org.apache.pinot.common.utils.LLCSegmentName; import org.apache.pinot.common.utils.TarGzCompressionUtils; import org.apache.pinot.core.data.manager.realtime.RealtimeConsumptionRateManager.ConsumptionRateLimiter; +import org.apache.pinot.segment.local.data.manager.SegmentDataManager; import org.apache.pinot.segment.local.dedup.PartitionDedupMetadataManager; import org.apache.pinot.segment.local.indexsegment.mutable.MutableSegmentImpl; +import org.apache.pinot.segment.local.io.writer.impl.DirectMemoryManager; +import org.apache.pinot.segment.local.io.writer.impl.MmapMemoryManager; import org.apache.pinot.segment.local.realtime.converter.ColumnIndicesForRealtimeTable; import org.apache.pinot.segment.local.realtime.converter.RealtimeSegmentConverter; import org.apache.pinot.segment.local.realtime.impl.RealtimeSegmentConfig; @@ -104,8 +107,10 @@ /** * Segment data manager for low level consumer realtime segments, which manages consumption and segment completion. */ -public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager { - protected enum State { +public class LLRealtimeSegmentDataManager extends SegmentDataManager { + + @VisibleForTesting + public enum State { // The state machine starts off with this state. While in this state we consume stream events // and index them in memory. We continue to be in this state until the end criteria is satisfied // (time or number of rows) @@ -154,8 +159,6 @@ public boolean isFinal() { } } - private static final int MINIMUM_CONSUME_TIME_MINUTES = 10; - @VisibleForTesting public class SegmentBuildDescriptor { final File _segmentTarFile; @@ -208,6 +211,9 @@ public void deleteSegmentFile() { } } + public static final String RESOURCE_TEMP_DIR_NAME = "_tmp"; + + private static final int MINIMUM_CONSUME_TIME_MINUTES = 10; private static final long TIME_THRESHOLD_FOR_LOG_MINUTES = 1; private static final long TIME_EXTENSION_ON_EMPTY_SEGMENT_HOURS = 1; private static final int MSG_COUNT_THRESHOLD_FOR_LOG = 100000; @@ -757,7 +763,7 @@ public void run() { _state = State.ERROR; _segmentLogger.error("Could not build segment for {}", _segmentNameStr); } else { - success = commitSegment(response.getControllerVipUrl(), _indexLoadingConfig.isEnableSplitCommit()); + success = commitSegment(response.getControllerVipUrl()); if (success) { _state = State.COMMITTED; } else { @@ -839,31 +845,39 @@ protected void buildSegmentForCommit(long buildTimeLeaseMs) { } } - @Override + /** + * Returns the current offset for the partition group. + */ public Map getPartitionToCurrentOffset() { - Map partitionToCurrentOffset = new HashMap<>(); - partitionToCurrentOffset.put(String.valueOf(_partitionGroupId), _currentOffset.toString()); - return partitionToCurrentOffset; + return Collections.singletonMap(String.valueOf(_partitionGroupId), _currentOffset.toString()); } - @Override + /** + * Returns the state of the consumer. + */ public ConsumerState getConsumerState() { return _state == State.ERROR ? ConsumerState.NOT_CONSUMING : ConsumerState.CONSUMING; } - @Override + /** + * Returns the timestamp of the last consumed message. + */ public long getLastConsumedTimestamp() { return _lastConsumedTimestampMs; } - @Override + /** + * Returns the {@link ConsumerPartitionState} for the partition group. + */ public Map getConsumerPartitionState() { String partitionGroupId = String.valueOf(_partitionGroupId); return Collections.singletonMap(partitionGroupId, new ConsumerPartitionState(partitionGroupId, getCurrentOffset(), getLastConsumedTimestamp(), fetchLatestStreamOffset(5_000), _lastRowMetadata)); } - @Override + /** + * Returns the {@link PartitionLagState} for the partition group. + */ public Map getPartitionToLagState( Map consumerPartitionStateMap) { if (_partitionMetadataProvider == null) { @@ -881,21 +895,22 @@ public StreamPartitionMsgOffset getLatestStreamOffsetAtStartupTime() { } @VisibleForTesting - protected SegmentBuildDescriptor getSegmentBuildDescriptor() { + SegmentBuildDescriptor getSegmentBuildDescriptor() { return _segmentBuildDescriptor; } @VisibleForTesting - protected Semaphore getPartitionGroupConsumerSemaphore() { + Semaphore getPartitionGroupConsumerSemaphore() { return _partitionGroupConsumerSemaphore; } @VisibleForTesting - protected AtomicBoolean getAcquiredConsumerSemaphore() { + AtomicBoolean getAcquiredConsumerSemaphore() { return _acquiredConsumerSemaphore; } - protected SegmentBuildDescriptor buildSegmentInternal(boolean forCommit) { + @VisibleForTesting + SegmentBuildDescriptor buildSegmentInternal(boolean forCommit) { closeStreamConsumers(); // Do not allow building segment when table data manager is already shut down if (_realtimeTableDataManager.isShutDown()) { @@ -1021,25 +1036,24 @@ protected SegmentBuildDescriptor buildSegmentInternal(boolean forCommit) { } } - protected boolean commitSegment(String controllerVipUrl, boolean isSplitCommit) { + @VisibleForTesting + boolean commitSegment(String controllerVipUrl) { File segmentTarFile = _segmentBuildDescriptor.getSegmentTarFile(); - if (segmentTarFile == null || !segmentTarFile.exists()) { - throw new RuntimeException("Segment file does not exist: " + segmentTarFile); - } - SegmentCompletionProtocol.Response commitResponse = commit(controllerVipUrl, isSplitCommit); - - if (!commitResponse.getStatus().equals(SegmentCompletionProtocol.ControllerResponseStatus.COMMIT_SUCCESS)) { + Preconditions.checkState(segmentTarFile != null && segmentTarFile.exists(), "Segment tar file: %s does not exist", + segmentTarFile); + SegmentCompletionProtocol.Response commitResponse = commit(controllerVipUrl); + if (commitResponse.getStatus() != SegmentCompletionProtocol.ControllerResponseStatus.COMMIT_SUCCESS) { _segmentLogger.warn("Controller response was {} and not {}", commitResponse.getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.COMMIT_SUCCESS); return false; } - _realtimeTableDataManager.replaceLLSegment(_segmentNameStr, _indexLoadingConfig); removeSegmentFile(); return true; } - protected SegmentCompletionProtocol.Response commit(String controllerVipUrl, boolean isSplitCommit) { + @VisibleForTesting + SegmentCompletionProtocol.Response commit(String controllerVipUrl) { SegmentCompletionProtocol.Request.Params params = new SegmentCompletionProtocol.Request.Params(); params.withSegmentName(_segmentNameStr).withStreamPartitionMsgOffset(_currentOffset.toString()) @@ -1053,7 +1067,7 @@ protected SegmentCompletionProtocol.Response commit(String controllerVipUrl, boo SegmentCommitter segmentCommitter; try { - segmentCommitter = _segmentCommitterFactory.createSegmentCommitter(isSplitCommit, params, controllerVipUrl); + segmentCommitter = _segmentCommitterFactory.createSegmentCommitter(params, controllerVipUrl); } catch (URISyntaxException e) { _segmentLogger.error("Failed to create a segment committer: ", e); return SegmentCompletionProtocol.RESP_NOT_SENT; @@ -1274,7 +1288,6 @@ protected void doDestroy() { cleanupMetrics(); } - @Override public void startConsumption() { _consumerThread = new Thread(new PartitionConsumer(), _segmentNameStr); _segmentLogger.info("Created new consumer thread {} for {}", _consumerThread, this); @@ -1346,9 +1359,14 @@ public LLRealtimeSegmentDataManager(SegmentZKMetadata segmentZKMetadata, TableCo _clientId = _tableNameWithType + "-" + streamTopic + "-" + _partitionGroupId; _segmentLogger = LoggerFactory.getLogger(LLRealtimeSegmentDataManager.class.getName() + "_" + _segmentNameStr); _tableStreamName = _tableNameWithType + "_" + streamTopic; - _memoryManager = getMemoryManager(realtimeTableDataManager.getConsumerDir(), _segmentNameStr, - indexLoadingConfig.isRealtimeOffHeapAllocation(), indexLoadingConfig.isDirectRealtimeOffHeapAllocation(), - serverMetrics); + if (_indexLoadingConfig.isRealtimeOffHeapAllocation() && !_indexLoadingConfig.isDirectRealtimeOffHeapAllocation()) { + _memoryManager = + new MmapMemoryManager(_realtimeTableDataManager.getConsumerDir(), _segmentNameStr, _serverMetrics); + } else { + // For on-heap allocation, we still need a memory manager for forward index. + // Dictionary will be allocated on heap. + _memoryManager = new DirectMemoryManager(_segmentNameStr, _serverMetrics); + } _rateLimiter = RealtimeConsumptionRateManager.getInstance() .createRateLimiter(_streamConfig, _tableNameWithType, _serverMetrics, _clientId); diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java deleted file mode 100644 index d98bc3be06b..00000000000 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java +++ /dev/null @@ -1,79 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pinot.core.data.manager.realtime; - -import java.util.Map; -import org.apache.pinot.common.metrics.ServerMetrics; -import org.apache.pinot.segment.local.data.manager.SegmentDataManager; -import org.apache.pinot.segment.local.io.writer.impl.DirectMemoryManager; -import org.apache.pinot.segment.local.io.writer.impl.MmapMemoryManager; -import org.apache.pinot.segment.spi.MutableSegment; -import org.apache.pinot.segment.spi.memory.PinotDataBufferMemoryManager; -import org.apache.pinot.spi.stream.ConsumerPartitionState; -import org.apache.pinot.spi.stream.PartitionLagState; -import org.apache.pinot.spi.utils.CommonConstants.ConsumerState; - - -public abstract class RealtimeSegmentDataManager extends SegmentDataManager { - public static final String RESOURCE_TEMP_DIR_NAME = "_tmp"; - - @Override - public abstract MutableSegment getSegment(); - - protected static PinotDataBufferMemoryManager getMemoryManager(String consumerDir, String segmentName, - boolean offHeap, boolean directOffHeap, ServerMetrics serverMetrics) { - if (offHeap && !directOffHeap) { - return new MmapMemoryManager(consumerDir, segmentName, serverMetrics); - } else { - // For on-heap allocation, we still need a memory manager for forward index. - // Dictionary will be allocated on heap. - return new DirectMemoryManager(segmentName, serverMetrics); - } - } - - /** - * Get the current offsets for all partitions of this consumer - */ - public abstract Map getPartitionToCurrentOffset(); - - /** - * Starts the consumption of the underlying realtime segments. - * In some cases, it is helpful to not do this inside the constructor itself. - */ - public abstract void startConsumption(); - - /** - * Get the state of the consumer - */ - public abstract ConsumerState getConsumerState(); - - /** - * @return Timestamp at which the last record was indexed - */ - public abstract long getLastConsumedTimestamp(); - - /** - * @return Per-partition consumer's status, which typically includes last consumed message timestamp, - * latest available upstream offset etc - */ - public abstract Map getConsumerPartitionState(); - - public abstract Map getPartitionToLagState( - Map consumerPartitionStateMap); -} diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java index 3df3b91a6dd..e4b34d244d6 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java @@ -267,7 +267,7 @@ protected void doShutdown() { } /* - * Method used by LLRealtimeSegmentManagers to update their partition delays + * Method used by RealtimeSegmentManagers to update their partition delays * * @param ingestionTimeMs Ingestion delay being reported. * @param partitionGroupId Partition ID for which delay is being updated. diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/SegmentCommitterFactory.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/SegmentCommitterFactory.java index 297f30482be..f8315b99d96 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/SegmentCommitterFactory.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/SegmentCommitterFactory.java @@ -51,26 +51,14 @@ public SegmentCommitterFactory(Logger segmentLogger, ServerSegmentCompletionProt _serverMetrics = serverMetrics; } - /** - * - * @param isSplitCommit Indicates if the controller has enabled split commit - * @param params Parameters to use in the Segment completion request - * @param controllerVipUrl Unused, - * @return - * @throws URISyntaxException - */ - public SegmentCommitter createSegmentCommitter(boolean isSplitCommit, SegmentCompletionProtocol.Request.Params params, + public SegmentCommitter createSegmentCommitter(SegmentCompletionProtocol.Request.Params params, String controllerVipUrl) throws URISyntaxException { - if (!isSplitCommit) { - return new DefaultSegmentCommitter(_logger, _protocolHandler, params); - } - SegmentUploader segmentUploader; - boolean uploadToFs = _streamConfig.isServerUploadToDeepStore(); String peerSegmentDownloadScheme = _tableConfig.getValidationConfig().getPeerSegmentDownloadScheme(); String segmentStoreUri = _indexLoadingConfig.getSegmentStoreURI(); + SegmentUploader segmentUploader; if (uploadToFs || peerSegmentDownloadScheme != null) { // TODO: peer scheme non-null check exists for backwards compatibility. remove check once users have migrated segmentUploader = new PinotFSSegmentUploader(segmentStoreUri, diff --git a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManagerTest.java b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManagerTest.java index 333e1a86041..b36059def77 100644 --- a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManagerTest.java +++ b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManagerTest.java @@ -953,7 +953,6 @@ private static InstanceDataManagerConfig makeInstanceDataManagerConfig() { when(dataManagerConfig.getReadMode()).thenReturn(null); when(dataManagerConfig.getAvgMultiValueCount()).thenReturn(null); when(dataManagerConfig.getSegmentFormatVersion()).thenReturn(null); - when(dataManagerConfig.isEnableSplitCommit()).thenReturn(false); when(dataManagerConfig.isRealtimeOffHeapAllocation()).thenReturn(false); when(dataManagerConfig.getConfig()).thenReturn(new PinotConfiguration()); return dataManagerConfig; @@ -999,7 +998,7 @@ public SegmentBuildDescriptor invokeBuildForCommit(long leaseTime) { } public boolean invokeCommit() { - return super.commitSegment("dummyUrl", false); + return super.commitSegment("dummyUrl"); } private void terminateLoopIfNecessary() { @@ -1039,9 +1038,8 @@ protected SegmentCompletionProtocol.Response postSegmentConsumedMsg() { } @Override - protected SegmentCompletionProtocol.Response commit(String controllerVipUrl, boolean isSplitCommit) { - SegmentCompletionProtocol.Response response = _responses.remove(); - return response; + protected SegmentCompletionProtocol.Response commit(String controllerVipUrl) { + return _responses.remove(); } @Override @@ -1051,9 +1049,9 @@ protected void postStopConsumedMsg(String reason) { // TODO: Some of the tests rely on specific number of calls to the `now()` method in the SegmentDataManager. // This is not a good coding practice and makes the code very fragile. This needs to be fixed. - // Invoking now() in any part of LLRealtimeSegmentDataManager code will break the following tests: - // 1. LLRealtimeSegmentDataManagerTest.testShouldNotSkipUnfilteredMessagesIfNotIndexedAndRowCountThresholdIsReached - // 2. LLRealtimeSegmentDataManagerTest.testShouldNotSkipUnfilteredMessagesIfNotIndexedAndTimeThresholdIsReached + // Invoking now() in any part of RealtimeSegmentDataManager code will break the following tests: + // 1. RealtimeSegmentDataManagerTest.testShouldNotSkipUnfilteredMessagesIfNotIndexedAndRowCountThresholdIsReached + // 2. RealtimeSegmentDataManagerTest.testShouldNotSkipUnfilteredMessagesIfNotIndexedAndTimeThresholdIsReached @Override protected long now() { // now() is called in the constructor before _timeSupplier is set @@ -1093,7 +1091,7 @@ protected SegmentBuildDescriptor buildSegmentInternal(boolean forCommit) { } @Override - protected boolean commitSegment(String controllerVipUrl, boolean isSplitCommit) { + protected boolean commitSegment(String controllerVipUrl) { _commitSegmentCalled = true; return true; } diff --git a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/SegmentCommitterFactoryTest.java b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/SegmentCommitterFactoryTest.java index f63b91433d1..65a2b16b961 100644 --- a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/SegmentCommitterFactoryTest.java +++ b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/SegmentCommitterFactoryTest.java @@ -55,19 +55,6 @@ private TableConfigBuilder createRealtimeTableConfig(String tableName, Map UPDATED_INVERTED_INDEX_COLUMNS = Collections.singletonList("DivActualElapsedTime"); private static final long RANDOM_SEED = System.currentTimeMillis(); private static final Random RANDOM = new Random(RANDOM_SEED); private static final int NUM_INVALID_RECORDS = 5; private final boolean _isDirectAlloc = RANDOM.nextBoolean(); private final boolean _isConsumerDirConfigured = RANDOM.nextBoolean(); - private final boolean _enableSplitCommit = RANDOM.nextBoolean(); private final boolean _enableLeadControllerResource = RANDOM.nextBoolean(); - private final long _startTime = System.currentTimeMillis(); private SchemaRegistryStarter.KafkaSchemaRegistryInstance _schemaRegistry; @Override @@ -203,10 +197,6 @@ protected void overrideServerConf(PinotConfiguration configuration) { if (_isConsumerDirConfigured) { configuration.setProperty(CommonConstants.Server.CONFIG_OF_CONSUMER_DIR, CONSUMER_DIRECTORY); } - if (_enableSplitCommit) { - configuration.setProperty(CommonConstants.Server.CONFIG_OF_ENABLE_SPLIT_COMMIT, true); - configuration.setProperty(CommonConstants.Server.CONFIG_OF_ENABLE_COMMIT_END_WITH_METADATA, true); - } } @Override @@ -293,9 +283,8 @@ protected long getCountStarResult() { public void setUp() throws Exception { System.out.println(format( - "Using random seed: %s, isDirectAlloc: %s, isConsumerDirConfigured: %s, enableSplitCommit: %s, " - + "enableLeadControllerResource: %s", RANDOM_SEED, _isDirectAlloc, _isConsumerDirConfigured, - _enableSplitCommit, _enableLeadControllerResource)); + "Using random seed: %s, isDirectAlloc: %s, isConsumerDirConfigured: %s, enableLeadControllerResource: %s", + RANDOM_SEED, _isDirectAlloc, _isConsumerDirConfigured, _enableLeadControllerResource)); // Remove the consumer directory FileUtils.deleteQuietly(new File(CONSUMER_DIRECTORY)); diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/LLCRealtimeClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/LLCRealtimeClusterIntegrationTest.java index f71fc3a2c87..65d2adb9b32 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/LLCRealtimeClusterIntegrationTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/LLCRealtimeClusterIntegrationTest.java @@ -72,7 +72,6 @@ public class LLCRealtimeClusterIntegrationTest extends BaseRealtimeClusterIntegr private final boolean _isDirectAlloc = RANDOM.nextBoolean(); private final boolean _isConsumerDirConfigured = RANDOM.nextBoolean(); - private final boolean _enableSplitCommit = RANDOM.nextBoolean(); private final boolean _enableLeadControllerResource = RANDOM.nextBoolean(); private final long _startTime = System.currentTimeMillis(); @@ -100,10 +99,6 @@ protected void overrideServerConf(PinotConfiguration configuration) { if (_isConsumerDirConfigured) { configuration.setProperty(CommonConstants.Server.CONFIG_OF_CONSUMER_DIR, CONSUMER_DIRECTORY); } - if (_enableSplitCommit) { - configuration.setProperty(CommonConstants.Server.CONFIG_OF_ENABLE_SPLIT_COMMIT, true); - configuration.setProperty(CommonConstants.Server.CONFIG_OF_ENABLE_COMMIT_END_WITH_METADATA, true); - } } @Override @@ -203,9 +198,8 @@ protected long getCountStarResult() { public void setUp() throws Exception { System.out.println(String.format( - "Using random seed: %s, isDirectAlloc: %s, isConsumerDirConfigured: %s, enableSplitCommit: %s, " - + "enableLeadControllerResource: %s", RANDOM_SEED, _isDirectAlloc, _isConsumerDirConfigured, - _enableSplitCommit, _enableLeadControllerResource)); + "Using random seed: %s, isDirectAlloc: %s, isConsumerDirConfigured: %s, enableLeadControllerResource: %s", + RANDOM_SEED, _isDirectAlloc, _isConsumerDirConfigured, _enableLeadControllerResource)); // Remove the consumer directory FileUtils.deleteQuietly(new File(CONSUMER_DIRECTORY)); diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/PeerDownloadLLCRealtimeClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/PeerDownloadLLCRealtimeClusterIntegrationTest.java index f264bb367c3..a4e164815d5 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/PeerDownloadLLCRealtimeClusterIntegrationTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/PeerDownloadLLCRealtimeClusterIntegrationTest.java @@ -79,7 +79,6 @@ public class PeerDownloadLLCRealtimeClusterIntegrationTest extends BaseRealtimeC private final boolean _isDirectAlloc = true; //Set as true; otherwise trigger indexing exception. private final boolean _isConsumerDirConfigured = true; - private final boolean _enableSplitCommit = true; private final boolean _enableLeadControllerResource = RANDOM.nextBoolean(); private static File _pinotFsRootDir; @@ -88,9 +87,8 @@ public class PeerDownloadLLCRealtimeClusterIntegrationTest extends BaseRealtimeC public void setUp() throws Exception { System.out.println(String.format( - "Using random seed: %s, isDirectAlloc: %s, isConsumerDirConfigured: %s, enableSplitCommit: %s, " - + "enableLeadControllerResource: %s", - RANDOM_SEED, _isDirectAlloc, _isConsumerDirConfigured, _enableSplitCommit, _enableLeadControllerResource)); + "Using random seed: %s, isDirectAlloc: %s, isConsumerDirConfigured: %s, enableLeadControllerResource: %s", + RANDOM_SEED, _isDirectAlloc, _isConsumerDirConfigured, _enableLeadControllerResource)); _pinotFsRootDir = new File(FileUtils.getTempDirectoryPath() + File.separator + System.currentTimeMillis() + "/"); Preconditions.checkState(_pinotFsRootDir.mkdir(), "Failed to make a dir for " + _pinotFsRootDir.getPath()); @@ -167,10 +165,6 @@ protected void overrideServerConf(PinotConfiguration configuration) { if (_isConsumerDirConfigured) { configuration.setProperty(CommonConstants.Server.CONFIG_OF_CONSUMER_DIR, CONSUMER_DIRECTORY); } - if (_enableSplitCommit) { - configuration.setProperty(CommonConstants.Server.CONFIG_OF_ENABLE_SPLIT_COMMIT, true); - configuration.setProperty(CommonConstants.Server.CONFIG_OF_ENABLE_COMMIT_END_WITH_METADATA, true); - } } @Test diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/IndexLoadingConfig.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/IndexLoadingConfig.java index e18efd2900b..7fe5bd33859 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/IndexLoadingConfig.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/IndexLoadingConfig.java @@ -99,10 +99,8 @@ public class IndexLoadingConfig { private SegmentVersion _segmentVersion; private ColumnMinMaxValueGeneratorMode _columnMinMaxValueGeneratorMode = ColumnMinMaxValueGeneratorMode.DEFAULT_MODE; private int _realtimeAvgMultiValueCount = DEFAULT_REALTIME_AVG_MULTI_VALUE_COUNT; - private boolean _enableSplitCommit; private boolean _isRealtimeOffHeapAllocation; private boolean _isDirectRealtimeOffHeapAllocation; - private boolean _enableSplitCommitEndWithMetadata; private String _segmentStoreURI; private boolean _errorOnColumnBuildFailure; @@ -418,8 +416,6 @@ private void extractFromInstanceConfig(InstanceDataManagerConfig instanceDataMan _segmentVersion = SegmentVersion.valueOf(instanceSegmentVersion.toLowerCase()); } - _enableSplitCommit = instanceDataManagerConfig.isEnableSplitCommit(); - _isRealtimeOffHeapAllocation = instanceDataManagerConfig.isRealtimeOffHeapAllocation(); _isDirectRealtimeOffHeapAllocation = instanceDataManagerConfig.isDirectRealtimeOffHeapAllocation(); @@ -427,7 +423,6 @@ private void extractFromInstanceConfig(InstanceDataManagerConfig instanceDataMan if (avgMultiValueCount != null) { _realtimeAvgMultiValueCount = Integer.valueOf(avgMultiValueCount); } - _enableSplitCommitEndWithMetadata = instanceDataManagerConfig.isEnableSplitCommitEndWithMetadata(); _segmentStoreURI = instanceDataManagerConfig.getConfig().getProperty(CommonConstants.Server.CONFIG_OF_SEGMENT_STORE_URI); _segmentDirectoryLoader = instanceDataManagerConfig.getSegmentDirectoryLoader(); @@ -512,7 +507,7 @@ public FSTType getFSTIndexType() { /** * Used in two places: * (1) In {@link PhysicalColumnIndexContainer} to create the index loading info for immutable segments - * (2) In LLRealtimeSegmentDataManager to create the RealtimeSegmentConfig. + * (2) In RealtimeSegmentDataManager to create the RealtimeSegmentConfig. * RealtimeSegmentConfig is used to specify the text index column info for newly * to-be-created Mutable Segments * @return a set containing names of text index columns @@ -810,14 +805,6 @@ public void setSegmentVersion(SegmentVersion segmentVersion) { _dirty = true; } - public boolean isEnableSplitCommit() { - return _enableSplitCommit; - } - - public boolean isEnableSplitCommitEndWithMetadata() { - return _enableSplitCommitEndWithMetadata; - } - public boolean isRealtimeOffHeapAllocation() { return _isRealtimeOffHeapAllocation; } diff --git a/pinot-server/src/main/java/org/apache/pinot/server/api/resources/DebugResource.java b/pinot-server/src/main/java/org/apache/pinot/server/api/resources/DebugResource.java index 5c666a28bdb..e7b79c5156e 100644 --- a/pinot-server/src/main/java/org/apache/pinot/server/api/resources/DebugResource.java +++ b/pinot-server/src/main/java/org/apache/pinot/server/api/resources/DebugResource.java @@ -45,7 +45,7 @@ import org.apache.pinot.common.restlet.resources.SegmentErrorInfo; import org.apache.pinot.common.restlet.resources.SegmentServerDebugInfo; import org.apache.pinot.core.data.manager.offline.ImmutableSegmentDataManager; -import org.apache.pinot.core.data.manager.realtime.RealtimeSegmentDataManager; +import org.apache.pinot.core.data.manager.realtime.LLRealtimeSegmentDataManager; import org.apache.pinot.segment.local.data.manager.SegmentDataManager; import org.apache.pinot.segment.local.data.manager.TableDataManager; import org.apache.pinot.segment.spi.ImmutableSegment; @@ -166,7 +166,7 @@ private long getSegmentSize(SegmentDataManager segmentDataManager) { private SegmentConsumerInfo getSegmentConsumerInfo(SegmentDataManager segmentDataManager, TableType tableType) { SegmentConsumerInfo segmentConsumerInfo = null; if (tableType == TableType.REALTIME) { - RealtimeSegmentDataManager realtimeSegmentDataManager = (RealtimeSegmentDataManager) segmentDataManager; + LLRealtimeSegmentDataManager realtimeSegmentDataManager = (LLRealtimeSegmentDataManager) segmentDataManager; Map partitionStateMap = realtimeSegmentDataManager.getConsumerPartitionState(); Map currentOffsets = realtimeSegmentDataManager.getPartitionToCurrentOffset(); Map upstreamLatest = partitionStateMap.entrySet().stream().collect( diff --git a/pinot-server/src/main/java/org/apache/pinot/server/api/resources/TablesResource.java b/pinot-server/src/main/java/org/apache/pinot/server/api/resources/TablesResource.java index b08833b966d..79993600448 100644 --- a/pinot-server/src/main/java/org/apache/pinot/server/api/resources/TablesResource.java +++ b/pinot-server/src/main/java/org/apache/pinot/server/api/resources/TablesResource.java @@ -76,7 +76,7 @@ import org.apache.pinot.common.utils.helix.HelixHelper; import org.apache.pinot.core.data.manager.InstanceDataManager; import org.apache.pinot.core.data.manager.offline.ImmutableSegmentDataManager; -import org.apache.pinot.core.data.manager.realtime.RealtimeSegmentDataManager; +import org.apache.pinot.core.data.manager.realtime.LLRealtimeSegmentDataManager; import org.apache.pinot.core.data.manager.realtime.SegmentUploader; import org.apache.pinot.segment.local.data.manager.SegmentDataManager; import org.apache.pinot.segment.local.data.manager.TableDataManager; @@ -309,7 +309,7 @@ public String getTableIndexes( int totalSegmentCount = 0; Map> columnToIndexesCount = new HashMap<>(); for (SegmentDataManager segmentDataManager : allSegments) { - if (segmentDataManager instanceof RealtimeSegmentDataManager) { + if (segmentDataManager instanceof LLRealtimeSegmentDataManager) { // REALTIME segments may not have indexes since not all indexes have mutable implementations continue; } @@ -685,8 +685,8 @@ public List getConsumingSegmentsInfo( List segmentDataManagers = tableDataManager.acquireAllSegments(); try { for (SegmentDataManager segmentDataManager : segmentDataManagers) { - if (segmentDataManager instanceof RealtimeSegmentDataManager) { - RealtimeSegmentDataManager realtimeSegmentDataManager = (RealtimeSegmentDataManager) segmentDataManager; + if (segmentDataManager instanceof LLRealtimeSegmentDataManager) { + LLRealtimeSegmentDataManager realtimeSegmentDataManager = (LLRealtimeSegmentDataManager) segmentDataManager; Map partitionStateMap = realtimeSegmentDataManager.getConsumerPartitionState(); Map recordsLagMap = new HashMap<>(); diff --git a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java index 9fd9c3e44af..ba66798763f 100644 --- a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java +++ b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java @@ -55,7 +55,6 @@ import org.apache.pinot.core.data.manager.offline.TableDataManagerProvider; import org.apache.pinot.core.data.manager.realtime.LLRealtimeSegmentDataManager; import org.apache.pinot.core.data.manager.realtime.PinotFSSegmentUploader; -import org.apache.pinot.core.data.manager.realtime.RealtimeSegmentDataManager; import org.apache.pinot.core.data.manager.realtime.SegmentBuildTimeLeaseExtender; import org.apache.pinot.core.data.manager.realtime.SegmentUploader; import org.apache.pinot.core.util.SegmentRefreshSemaphore; @@ -172,7 +171,7 @@ private void initInstanceDataDir(File instanceDataDir) { File[] tableDataDirs = instanceDataDir.listFiles((dir, name) -> TableNameBuilder.isTableResource(name)); if (tableDataDirs != null) { for (File tableDataDir : tableDataDirs) { - File resourceTempDir = new File(tableDataDir, RealtimeSegmentDataManager.RESOURCE_TEMP_DIR_NAME); + File resourceTempDir = new File(tableDataDir, LLRealtimeSegmentDataManager.RESOURCE_TEMP_DIR_NAME); try { FileUtils.deleteDirectory(resourceTempDir); } catch (IOException e) { @@ -448,14 +447,11 @@ private void reloadSegmentWithMetadata(String tableNameWithType, SegmentMetadata tableNameWithType); return; } - // TODO: Support force committing HLC consuming segment - if (!(segmentDataManager instanceof LLRealtimeSegmentDataManager)) { - LOGGER.warn("Cannot reload non-LLC consuming segment: {} in table: {}", segmentName, tableNameWithType); - return; + if (segmentDataManager instanceof LLRealtimeSegmentDataManager) { + LOGGER.info("Reloading (force committing) consuming segment: {} in table: {}", segmentName, + tableNameWithType); + ((LLRealtimeSegmentDataManager) segmentDataManager).forceCommit(); } - LOGGER.info("Reloading (force committing) LLC consuming segment: {} in table: {}", segmentName, - tableNameWithType); - ((LLRealtimeSegmentDataManager) segmentDataManager).forceCommit(); return; } finally { tableDataManager.releaseSegment(segmentDataManager); @@ -605,8 +601,7 @@ public void forceCommit(String tableNameWithType, Set segmentNames) { if (segmentDataManager != null) { try { if (segmentDataManager instanceof LLRealtimeSegmentDataManager) { - LLRealtimeSegmentDataManager llSegmentDataManager = (LLRealtimeSegmentDataManager) segmentDataManager; - llSegmentDataManager.forceCommit(); + ((LLRealtimeSegmentDataManager) segmentDataManager).forceCommit(); } } finally { tableDataManager.releaseSegment(segmentDataManager); diff --git a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManagerConfig.java b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManagerConfig.java index 67a28dd4482..84d72ad8b1a 100644 --- a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManagerConfig.java +++ b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManagerConfig.java @@ -95,11 +95,6 @@ public class HelixInstanceDataManagerConfig implements InstanceDataManagerConfig private static final String ENABLE_STREAM_SEGMENT_DOWNLOAD_UNTAR = "segment.stream.download.untar"; private static final boolean DEFAULT_ENABLE_STREAM_SEGMENT_DOWNLOAD_UNTAR = false; - // Key of whether to enable split commit - private static final String ENABLE_SPLIT_COMMIT = "enable.split.commit"; - // Key of whether to enable split commit end with segment metadata files. - private static final String ENABLE_SPLIT_COMMIT_END_WITH_METADATA = "enable.commitend.metadata"; - // Whether memory for realtime consuming segments should be allocated off-heap. private static final String REALTIME_OFFHEAP_ALLOCATION = "realtime.alloc.offheap"; // And whether the allocation should be direct (default is to allocate via mmap) @@ -209,16 +204,6 @@ public String getSegmentFormatVersion() { return _instanceDataManagerConfiguration.getProperty(SEGMENT_FORMAT_VERSION); } - @Override - public boolean isEnableSplitCommit() { - return _instanceDataManagerConfiguration.getProperty(ENABLE_SPLIT_COMMIT, true); - } - - @Override - public boolean isEnableSplitCommitEndWithMetadata() { - return _instanceDataManagerConfiguration.getProperty(ENABLE_SPLIT_COMMIT_END_WITH_METADATA, true); - } - @Override public boolean isRealtimeOffHeapAllocation() { return _instanceDataManagerConfiguration.getProperty(REALTIME_OFFHEAP_ALLOCATION, true); diff --git a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/IngestionBasedConsumptionStatusChecker.java b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/IngestionBasedConsumptionStatusChecker.java index dab154004d3..54667a43aca 100644 --- a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/IngestionBasedConsumptionStatusChecker.java +++ b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/IngestionBasedConsumptionStatusChecker.java @@ -68,7 +68,7 @@ public int getNumConsumingSegmentsNotReachedIngestionCriteria() { } if (!(segmentDataManager instanceof LLRealtimeSegmentDataManager)) { // There's a possibility that a consuming segment has converted to a committed segment. If that's the case, - // segment data manager will not be of type LLRealtime. + // segment data manager will not be of type RealtimeSegmentDataManager. _logger.info("Segment {} is already committed and is considered caught up.", segName); _caughtUpSegments.add(segName); continue; diff --git a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SegmentOnlineOfflineStateModelFactory.java b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SegmentOnlineOfflineStateModelFactory.java index 8e4ef1b3f11..1ad92ee96df 100644 --- a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SegmentOnlineOfflineStateModelFactory.java +++ b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SegmentOnlineOfflineStateModelFactory.java @@ -110,8 +110,7 @@ public void onBecomeOnlineFromConsuming(Message message, NotificationContext con if (!(acquiredSegment instanceof LLRealtimeSegmentDataManager)) { // We found an LLC segment that is not consuming right now, must be that we already swapped it with a // segment that has been built. Nothing to do for this state transition. - _logger.info( - "Segment {} not an instance of LLRealtimeSegmentDataManager. Reporting success for the transition", + _logger.info("Segment {} not an instance of RealtimeSegmentDataManager. Reporting success for the transition", acquiredSegment.getSegmentName()); return; } diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/config/instance/InstanceDataManagerConfig.java b/pinot-spi/src/main/java/org/apache/pinot/spi/config/instance/InstanceDataManagerConfig.java index 4c380cb5b25..a97f1639e1c 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/config/instance/InstanceDataManagerConfig.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/config/instance/InstanceDataManagerConfig.java @@ -43,10 +43,6 @@ public interface InstanceDataManagerConfig { String getAvgMultiValueCount(); - boolean isEnableSplitCommit(); - - boolean isEnableSplitCommitEndWithMetadata(); - boolean isRealtimeOffHeapAllocation(); boolean isDirectRealtimeOffHeapAllocation(); diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java index 713bbb2fe32..56e6dcf9992 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java @@ -527,9 +527,6 @@ public static class Server { public static final int DEFAULT_ADMIN_API_PORT = 8097; public static final String CONFIG_OF_SEGMENT_FORMAT_VERSION = "pinot.server.instance.segment.format.version"; - public static final String CONFIG_OF_ENABLE_SPLIT_COMMIT = "pinot.server.instance.enable.split.commit"; - public static final String CONFIG_OF_ENABLE_COMMIT_END_WITH_METADATA = - "pinot.server.instance.enable.commitend.metadata"; public static final String CONFIG_OF_REALTIME_OFFHEAP_ALLOCATION = "pinot.server.instance.realtime.alloc.offheap"; public static final String CONFIG_OF_REALTIME_OFFHEAP_DIRECT_ALLOCATION = "pinot.server.instance.realtime.alloc.offheap.direct";