Skip to content

Commit

Permalink
Remove the global lock in SegmentCompletionManager (#11679)
Browse files Browse the repository at this point in the history
  • Loading branch information
Jackie-Jiang authored Sep 26, 2023
1 parent 1b5de10 commit 9ed9f21
Showing 1 changed file with 25 additions and 53 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,13 @@
package org.apache.pinot.controller.helix.core.realtime;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.helix.HelixManager;
import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
import org.apache.pinot.common.metrics.ControllerMeter;
Expand Down Expand Up @@ -78,8 +77,6 @@ private enum State {
private final PinotLLCRealtimeSegmentManager _segmentManager;
private final ControllerMetrics _controllerMetrics;
private final LeadControllerManager _leadControllerManager;
private final Lock[] _fsmLocks;
private static final int NUM_FSM_LOCKS = 20;

// Half hour max commit time for all segments
private static final int MAX_COMMIT_TIME_FOR_ALL_SEGMENTS_SECONDS = 1800;
Expand All @@ -98,12 +95,8 @@ public SegmentCompletionManager(HelixManager helixManager, PinotLLCRealtimeSegme
_segmentManager = segmentManager;
_controllerMetrics = controllerMetrics;
_leadControllerManager = leadControllerManager;
SegmentCompletionProtocol
.setMaxSegmentCommitTimeMs(TimeUnit.MILLISECONDS.convert(segmentCommitTimeoutSeconds, TimeUnit.SECONDS));
_fsmLocks = new Lock[NUM_FSM_LOCKS];
for (int i = 0; i < NUM_FSM_LOCKS; i++) {
_fsmLocks[i] = new ReentrantLock();
}
SegmentCompletionProtocol.setMaxSegmentCommitTimeMs(
TimeUnit.MILLISECONDS.convert(segmentCommitTimeoutSeconds, TimeUnit.SECONDS));
}

public String getControllerVipUrl() {
Expand All @@ -122,51 +115,30 @@ protected StreamPartitionMsgOffsetFactory getStreamPartitionMsgOffsetFactory(LLC
return StreamConsumerFactoryProvider.create(streamConfig).createStreamMsgOffsetFactory();
}

// We need to make sure that we never create multiple FSMs for the same segment
// Obtain locks based on segment name, so as to disallow same segment names entering together
private SegmentCompletionFSM lookupOrCreateFsm(final LLCSegmentName segmentName, String msgType) {
final String segmentNameStr = segmentName.getSegmentName();

int lockIndex = (segmentNameStr.hashCode() & Integer.MAX_VALUE) % NUM_FSM_LOCKS;
Lock lock = _fsmLocks[lockIndex];
private SegmentCompletionFSM lookupOrCreateFsm(LLCSegmentName llcSegmentName, String msgType) {
return _fsmMap.computeIfAbsent(llcSegmentName.getSegmentName(), k -> createFsm(llcSegmentName, msgType));
}

private SegmentCompletionFSM createFsm(LLCSegmentName llcSegmentName, String msgType) {
String realtimeTableName = TableNameBuilder.REALTIME.tableNameWithType(llcSegmentName.getTableName());
String segmentName = llcSegmentName.getSegmentName();
SegmentZKMetadata segmentMetadata = _segmentManager.getSegmentZKMetadata(realtimeTableName, segmentName, null);
Preconditions.checkState(segmentMetadata != null, "Failed to find ZK metadata for segment: %s", segmentName);
SegmentCompletionFSM fsm;
try {
lock.lock();
fsm = _fsmMap.get(segmentNameStr);
if (fsm == null) {
// Look up propertystore to see if this is a completed segment
// TODO if we keep a list of last few committed segments, we don't need to go to zk for this.
final String realtimeTableName = TableNameBuilder.REALTIME.tableNameWithType(segmentName.getTableName());
SegmentZKMetadata segmentMetadata =
_segmentManager.getSegmentZKMetadata(realtimeTableName, segmentName.getSegmentName(), null);
if (segmentMetadata.getStatus() == CommonConstants.Segment.Realtime.Status.DONE) {
// Best to go through the state machine for this case as well, so that all code regarding state handling is
// in one place
// Also good for synchronization, because it is possible that multiple threads take this path, and we don't
// want
// multiple instances of the FSM to be created for the same commit sequence at the same time.
StreamPartitionMsgOffsetFactory factory = getStreamPartitionMsgOffsetFactory(segmentName);
final StreamPartitionMsgOffset endOffset = factory.create(segmentMetadata.getEndOffset());
fsm = SegmentCompletionFSM
.fsmInCommit(_segmentManager, this, segmentName, segmentMetadata.getNumReplicas(), endOffset);
} else if (msgType.equals(SegmentCompletionProtocol.MSG_TYPE_STOPPED_CONSUMING)) {
fsm = SegmentCompletionFSM
.fsmStoppedConsuming(_segmentManager, this, segmentName, segmentMetadata.getNumReplicas());
} else {
// Segment is in the process of completing, and this is the first one to respond. Create fsm
fsm = SegmentCompletionFSM.fsmInHolding(_segmentManager, this, segmentName, segmentMetadata.getNumReplicas());
}
LOGGER.info("Created FSM {}", fsm);
_fsmMap.put(segmentNameStr, fsm);
}
} catch (Exception e) {
// Server gone wonky. Segment does not exist in propstore
LOGGER.error("Exception getting FSM for segment {}", segmentNameStr, e);
throw new RuntimeException("Exception getting FSM for segment " + segmentNameStr, e);
} finally {
lock.unlock();
}
if (segmentMetadata.getStatus() == CommonConstants.Segment.Realtime.Status.DONE) {
// Segment is already committed
StreamPartitionMsgOffsetFactory factory = getStreamPartitionMsgOffsetFactory(llcSegmentName);
StreamPartitionMsgOffset endOffset = factory.create(segmentMetadata.getEndOffset());
fsm = SegmentCompletionFSM.fsmInCommit(_segmentManager, this, llcSegmentName, segmentMetadata.getNumReplicas(),
endOffset);
} else if (msgType.equals(SegmentCompletionProtocol.MSG_TYPE_STOPPED_CONSUMING)) {
fsm = SegmentCompletionFSM.fsmStoppedConsuming(_segmentManager, this, llcSegmentName,
segmentMetadata.getNumReplicas());
} else {
// Segment is in the process of completing, and this is the first one to respond. Create fsm
fsm = SegmentCompletionFSM.fsmInHolding(_segmentManager, this, llcSegmentName, segmentMetadata.getNumReplicas());
}
LOGGER.info("Created FSM {}", fsm);
return fsm;
}

Expand Down

0 comments on commit 9ed9f21

Please sign in to comment.