Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[server] Add threadsafe mode to venice-server which adjusts message processing order #910

Open
wants to merge 12 commits into
base: main
Choose a base branch
from
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@
import static com.linkedin.venice.ConfigKeys.SERVER_INGESTION_ISOLATION_SERVICE_PORT;
import static com.linkedin.venice.ConfigKeys.SERVER_INGESTION_MODE;
import static com.linkedin.venice.ConfigKeys.SERVER_INGESTION_TASK_MAX_IDLE_COUNT;
import static com.linkedin.venice.ConfigKeys.SERVER_INGESTION_TASK_THREAD_SAFE_MODE;
import static com.linkedin.venice.ConfigKeys.SERVER_KAFKA_CONSUMER_OFFSET_COLLECTION_ENABLED;
import static com.linkedin.venice.ConfigKeys.SERVER_KAFKA_MAX_POLL_RECORDS;
import static com.linkedin.venice.ConfigKeys.SERVER_LEADER_COMPLETE_STATE_CHECK_IN_FOLLOWER_ENABLED;
Expand Down Expand Up @@ -445,6 +446,8 @@ public class VeniceServerConfig extends VeniceClusterConfig {

private final int ingestionTaskMaxIdleCount;

private final boolean threadSafeMode;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should probably find another name for this functionality, since technically the old code is also supposed to be threadsafe... it's just that the new code is intended to make it easier to maintain thread-safety (less likely to introduce concurrency bugs)...

I guess the most significant functional change with this mode is that the leader persists changes locally prior to writing to Kafka, and as a result the TransientRecordCache becomes unnecessary. Perhaps a name along those lines might be more clear?

How about: leaderPersistsLocallyBeforeProducingToVT / leader.persists.locally.before.producing.to.vt

It's a bit of a mouthful, but seems more clear... a more concise version might be "Leader Persists Before Producing", and in day-to-day operations we might end up calling it "LPBP", for short. IDK, I'm just riffing at this point 😂

Open to other suggestions too, of course.


private final long metaStoreWriterCloseTimeoutInMS;
private final int metaStoreWriterCloseConcurrency;

Expand Down Expand Up @@ -727,6 +730,7 @@ public VeniceServerConfig(VeniceProperties serverProperties, Map<String, Map<Str
pubSubClientsFactory = new PubSubClientsFactory(serverProperties);
routerPrincipalName = serverProperties.getString(ROUTER_PRINCIPAL_NAME, "CN=venice-router");
ingestionTaskMaxIdleCount = serverProperties.getInt(SERVER_INGESTION_TASK_MAX_IDLE_COUNT, 10000);
threadSafeMode = serverProperties.getBoolean(SERVER_INGESTION_TASK_THREAD_SAFE_MODE, false);
metaStoreWriterCloseTimeoutInMS = serverProperties.getLong(META_STORE_WRITER_CLOSE_TIMEOUT_MS, 300000L);
metaStoreWriterCloseConcurrency = serverProperties.getInt(META_STORE_WRITER_CLOSE_CONCURRENCY, -1);
ingestionHeartbeatIntervalMs =
Expand Down Expand Up @@ -1280,6 +1284,10 @@ public int getIngestionTaskMaxIdleCount() {
return ingestionTaskMaxIdleCount;
}

public boolean isThreadSafeMode() {
return threadSafeMode;
}

public boolean isKMERegistrationFromMessageHeaderEnabled() {
return isKMERegistrationFromMessageHeaderEnabled;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,17 @@ public ActiveActiveProducerCallback(
LeaderProducedRecordContext leaderProducedRecordContext,
int subPartition,
String kafkaUrl,
long beforeProcessingRecordTimestamp) {
long beforeProcessingRecordTimestamp,
boolean syncOffsetsOnlyAfterProducing) {
super(
ingestionTask,
sourceConsumerRecord,
partitionConsumptionState,
leaderProducedRecordContext,
subPartition,
kafkaUrl,
beforeProcessingRecordTimestamp);
beforeProcessingRecordTimestamp,
syncOffsetsOnlyAfterProducing);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ protected DelegateConsumerRecordResult delegateConsumerRecord(
beforeProcessingBatchRecordsTimestampMs);
} else {
/**
* The below flow must be executed in a critical session for the same key:
* The below flow must be executed in a critical section for the same key:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks 😄 ...

* Read existing value/RMD from transient record cache/disk -> perform DCR and decide incoming value wins
* -> update transient record cache -> produce to VT (just call send, no need to wait for the produce future in the critical session)
*
Expand Down Expand Up @@ -499,7 +499,6 @@ protected void processMessageAndMaybeProduceToKafka(
.updateLatestIgnoredUpstreamRTOffset(kafkaClusterIdToUrlMap.get(kafkaClusterId), sourceOffset);
} else {
validatePostOperationResultsAndRecord(mergeConflictResult, offsetSumPreOperation, recordTimestampsPreOperation);

// Apply this update to any views for this store
// TODO: It'd be good to be able to do this in LeaderFollowerStoreIngestionTask instead, however, AA currently is
// the
Expand Down Expand Up @@ -1572,14 +1571,16 @@ protected LeaderProducerCallback createProducerCallback(
LeaderProducedRecordContext leaderProducedRecordContext,
int subPartition,
String kafkaUrl,
long beforeProcessingRecordTimestampNs) {
long beforeProcessingRecordTimestampNs,
boolean syncOffsetsOnlyAfterProducing) {
return new ActiveActiveProducerCallback(
this,
consumerRecord,
partitionConsumptionState,
leaderProducedRecordContext,
subPartition,
kafkaUrl,
beforeProcessingRecordTimestampNs);
beforeProcessingRecordTimestampNs,
syncOffsetsOnlyAfterProducing);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1602,13 +1602,27 @@ protected void produceToLocalKafka(
String kafkaUrl,
int kafkaClusterId,
long beforeProcessingRecordTimestampNs) {

if (this.runInThreadSafeMode) {
// Write to rocksdb. At time of writing, this is the last step after a huge amount of processing and compression
// and whatnot. At this stage we do not sync the offset, instead doing that after successfully produce.
this.processConsumerRecord(
ZacAttack marked this conversation as resolved.
Show resolved Hide resolved
consumerRecord,
leaderProducedRecordContext,
subPartition,
kafkaUrl,
beforeProcessingRecordTimestampNs,
false);
}

LeaderProducerCallback callback = createProducerCallback(
consumerRecord,
partitionConsumptionState,
leaderProducedRecordContext,
subPartition,
kafkaUrl,
beforeProcessingRecordTimestampNs);
beforeProcessingRecordTimestampNs,
this.runInThreadSafeMode);
long sourceTopicOffset = consumerRecord.getOffset();
LeaderMetadataWrapper leaderMetadataWrapper = new LeaderMetadataWrapper(sourceTopicOffset, kafkaClusterId);
partitionConsumptionState.setLastLeaderPersistFuture(leaderProducedRecordContext.getPersistedToDBFuture());
Expand Down Expand Up @@ -2093,7 +2107,8 @@ private void propagateHeartbeatFromUpstreamTopicToLocalVersionTopic(
leaderProducedRecordContext,
partition,
kafkaUrl,
beforeProcessingRecordTimestampNs);
beforeProcessingRecordTimestampNs,
this.runInThreadSafeMode);
LeaderMetadataWrapper leaderMetadataWrapper = new LeaderMetadataWrapper(consumerRecord.getOffset(), kafkaClusterId);
List<Integer> subPartitions =
PartitionUtils.getSubPartitions(partitionConsumptionState.getUserPartition(), amplificationFactor);
Expand Down Expand Up @@ -2160,7 +2175,7 @@ protected void recordHeartbeatReceived(
* This function should be called as one of the first steps in processing pipeline for all messages consumed from any kafka topic.
*
* The caller of this function should only process this {@param consumerRecord} further if the return is
* {@link DelegateConsumerRecordResult#QUEUED_TO_DRAINER}.
* {@link DelegateConsumerRecordResult#QUEUE_TO_DRAINER}.
*
* This function assumes {@link #shouldProcessRecord(PubSubMessage, int)} has been called which happens in
* {@link StoreIngestionTask#produceToStoreBufferServiceOrKafka(Iterable, PubSubTopicPartition, String, int)}
Expand All @@ -2182,7 +2197,6 @@ protected DelegateConsumerRecordResult delegateConsumerRecord(
int kafkaClusterId,
long beforeProcessingPerRecordTimestampNs,
long beforeProcessingBatchRecordsTimestampMs) {
boolean produceToLocalKafka = false;
try {
KafkaKey kafkaKey = consumerRecord.getKey();
KafkaMessageEnvelope kafkaValue = consumerRecord.getValue();
Expand All @@ -2198,9 +2212,9 @@ protected DelegateConsumerRecordResult delegateConsumerRecord(
PartitionConsumptionState partitionConsumptionState = partitionConsumptionStateMap.get(subPartition);
if (partitionConsumptionState == null) {
// The partition is likely unsubscribed, will skip these messages.
return DelegateConsumerRecordResult.SKIPPED_MESSAGE;
return DelegateConsumerRecordResult.END_PROCESSING;
}
produceToLocalKafka = shouldProduceToVersionTopic(partitionConsumptionState);
boolean produceToLocalKafka = shouldProduceToVersionTopic(partitionConsumptionState);
// UPDATE message is only expected in LEADER which must be produced to kafka.
MessageType msgType = MessageType.valueOf(kafkaValue);
if (msgType == MessageType.UPDATE && !produceToLocalKafka) {
Expand All @@ -2222,7 +2236,10 @@ protected DelegateConsumerRecordResult delegateConsumerRecord(
* (i) it's a follower or (ii) leader is consuming from VT
*/
if (!produceToLocalKafka) {
return DelegateConsumerRecordResult.QUEUED_TO_DRAINER;
// TODO: The next step will put in the drainer queue. When threadsafe mode is enabled, it means we skip
// the drainer during rt consumption and commit straight to rocksdb. To remove drainer completely,
// we should do the same here
return DelegateConsumerRecordResult.QUEUE_TO_DRAINER;
}

// If we are here the message must be produced to local kafka or silently consumed.
Expand Down Expand Up @@ -2271,7 +2288,7 @@ protected DelegateConsumerRecordResult delegateConsumerRecord(
*/
divErrorMetricCallback.accept(e);
LOGGER.debug("{} : Skipping a duplicate record at offset: {}", ingestionTaskName, consumerRecord.getOffset());
return DelegateConsumerRecordResult.DUPLICATE_MESSAGE;
return DelegateConsumerRecordResult.END_PROCESSING;
}

if (kafkaKey.isControlMessage()) {
Expand Down Expand Up @@ -2427,7 +2444,7 @@ protected DelegateConsumerRecordResult delegateConsumerRecord(
if (isDataRecovery && !partitionConsumptionState.isBatchOnly()) {
// Ignore remote VT's TS message since we might need to consume more RT or incremental push data from VT
// that's no longer in the local/remote RT due to retention.
return DelegateConsumerRecordResult.SKIPPED_MESSAGE;
return DelegateConsumerRecordResult.END_PROCESSING;
}
leaderProducedRecordContext =
LeaderProducedRecordContext.newControlMessageRecord(kafkaKey.getKey(), controlMessage);
Expand All @@ -2448,7 +2465,7 @@ protected DelegateConsumerRecordResult delegateConsumerRecord(
beforeProcessingPerRecordTimestampNs);
break;
case VERSION_SWAP:
return DelegateConsumerRecordResult.QUEUED_TO_DRAINER;
return DelegateConsumerRecordResult.QUEUE_TO_DRAINER;
default:
// do nothing
break;
Expand Down Expand Up @@ -2478,7 +2495,7 @@ protected DelegateConsumerRecordResult delegateConsumerRecord(
beforeProcessingPerRecordTimestampNs,
beforeProcessingBatchRecordsTimestampMs);
}
return DelegateConsumerRecordResult.PRODUCED_TO_KAFKA;
return DelegateConsumerRecordResult.END_PROCESSING;
} catch (Exception e) {
throw new VeniceException(
ingestionTaskName + " hasProducedToKafka: exception for message received from: "
Expand Down Expand Up @@ -3418,15 +3435,17 @@ protected LeaderProducerCallback createProducerCallback(
LeaderProducedRecordContext leaderProducedRecordContext,
int subPartition,
String kafkaUrl,
long beforeProcessingRecordTimestampNs) {
long beforeProcessingRecordTimestampNs,
boolean syncOffsetsOnlyAfterProducing) {
return new LeaderProducerCallback(
this,
consumerRecord,
partitionConsumptionState,
leaderProducedRecordContext,
subPartition,
kafkaUrl,
beforeProcessingRecordTimestampNs);
beforeProcessingRecordTimestampNs,
syncOffsetsOnlyAfterProducing);
}

protected Lazy<VeniceWriter<byte[], byte[], byte[]>> getVeniceWriter() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,14 +55,17 @@ public class LeaderProducerCallback implements ChunkAwareCallback {
protected ChunkedValueManifest oldValueManifest = null;
protected ChunkedValueManifest oldRmdManifest = null;

private final boolean syncOffsetsOnlyAfterProducing;

public LeaderProducerCallback(
LeaderFollowerStoreIngestionTask ingestionTask,
PubSubMessage<KafkaKey, KafkaMessageEnvelope, Long> sourceConsumerRecord,
PartitionConsumptionState partitionConsumptionState,
LeaderProducedRecordContext leaderProducedRecordContext,
int subPartition,
String kafkaUrl,
long beforeProcessingRecordTimestampNs) {
long beforeProcessingRecordTimestampNs,
boolean syncOffsetsOnlyAfterProducing) {
this.ingestionTask = ingestionTask;
this.sourceConsumerRecord = sourceConsumerRecord;
this.partitionConsumptionState = partitionConsumptionState;
Expand All @@ -71,6 +74,7 @@ public LeaderProducerCallback(
this.leaderProducedRecordContext = leaderProducedRecordContext;
this.produceTimeNs = ingestionTask.isUserSystemStore() ? 0 : System.nanoTime();
this.beforeProcessingRecordTimestampNs = beforeProcessingRecordTimestampNs;
this.syncOffsetsOnlyAfterProducing = syncOffsetsOnlyAfterProducing;
}

@Override
Expand Down Expand Up @@ -156,7 +160,7 @@ public void onCompletion(PubSubProduceResult produceResult, Exception e) {
*/
if (chunkedValueManifest == null) {
leaderProducedRecordContext.setProducedOffset(produceResult.getOffset());
ingestionTask.produceToStoreBufferService(
produceToStoreBufferService(
sourceConsumerRecord,
leaderProducedRecordContext,
subPartition,
Expand Down Expand Up @@ -194,7 +198,7 @@ public void onCompletion(PubSubProduceResult produceResult, Exception e) {
manifestPut,
leaderProducedRecordContext.getPersistedToDBFuture());
producedRecordForManifest.setProducedOffset(produceResult.getOffset());
ingestionTask.produceToStoreBufferService(
produceToStoreBufferService(
sourceConsumerRecord,
producedRecordForManifest,
subPartition,
Expand Down Expand Up @@ -321,7 +325,7 @@ private long produceChunksToStoreBufferService(
LeaderProducedRecordContext producedRecordForChunk =
LeaderProducedRecordContext.newChunkPutRecord(ByteUtils.extractByteArray(chunkKey), chunkPut);
producedRecordForChunk.setProducedOffset(-1);
ingestionTask.produceToStoreBufferService(
produceToStoreBufferService(
sourceConsumerRecord,
producedRecordForChunk,
subPartition,
Expand All @@ -347,7 +351,7 @@ void produceDeprecatedChunkDeletionToStoreBufferService(ChunkedValueManifest man
LeaderProducedRecordContext producedRecordForChunk =
LeaderProducedRecordContext.newChunkDeleteRecord(ByteUtils.extractByteArray(chunkKey), chunkDelete);
producedRecordForChunk.setProducedOffset(-1);
ingestionTask.produceToStoreBufferService(
produceToStoreBufferService(
sourceConsumerRecord,
producedRecordForChunk,
subPartition,
Expand All @@ -357,6 +361,28 @@ void produceDeprecatedChunkDeletionToStoreBufferService(ChunkedValueManifest man
}
}

protected void produceToStoreBufferService(
PubSubMessage<KafkaKey, KafkaMessageEnvelope, Long> consumedRecord,
LeaderProducedRecordContext leaderProducedRecordContext,
int subPartition,
String kafkaUrl,
long beforeProcessingRecordTimestampNs,
long currentTimeForMetricsMs) throws InterruptedException {
if (this.syncOffsetsOnlyAfterProducing) {
// sync offsets
ingestionTask
.maybeSyncOffsets(consumedRecord, leaderProducedRecordContext, partitionConsumptionState, subPartition);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does the PCS instance passed here has a chance of having been modified by another thread (the processing thread)? I imagine we don't clone the PCS in order to make them immutable, so I wonder if what we would be checkpointing here is guaranteed to represent the correct state, up until what was just produced, rather than up until what's been consumed by another thread?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a good catch, yeah it could happen.

} else {
ingestionTask.produceToStoreBufferService(
consumedRecord,
leaderProducedRecordContext,
subPartition,
kafkaUrl,
beforeProcessingRecordTimestampNs,
currentTimeForMetricsMs);
}
}

// Visible for VeniceWriter unit test.
public PartitionConsumptionState getPartitionConsumptionState() {
return partitionConsumptionState;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,14 @@ public class PartitionConsumptionState {
*/
private boolean firstHeartBeatSOSReceived;

public PartitionConsumptionState(int partition, int amplificationFactor, OffsetRecord offsetRecord, boolean hybrid) {
private boolean threadSafeMode;

public PartitionConsumptionState(
int partition,
int amplificationFactor,
OffsetRecord offsetRecord,
boolean hybrid,
boolean threadSafeMode) {
this.partition = partition;
this.amplificationFactor = amplificationFactor;
this.userPartition = PartitionUtils.getUserPartition(partition, amplificationFactor);
Expand All @@ -237,6 +244,7 @@ public PartitionConsumptionState(int partition, int amplificationFactor, OffsetR
this.processedRecordSizeSinceLastSync = 0;
this.leaderFollowerState = LeaderFollowerStateType.STANDBY;
this.expectedSSTFileChecksum = null;
this.threadSafeMode = threadSafeMode;
/**
* Initialize the latest consumed time with current time; otherwise, it's 0 by default
* and leader will be promoted immediately.
Expand Down Expand Up @@ -565,6 +573,10 @@ public void setTransientRecord(
int valueLen,
int valueSchemaId,
GenericRecord replicationMetadataRecord) {
if (this.threadSafeMode) {
// NoOp
ZacAttack marked this conversation as resolved.
Show resolved Hide resolved
return;
}
TransientRecord transientRecord =
new TransientRecord(value, valueOffset, valueLen, valueSchemaId, kafkaClusterId, kafkaConsumedOffset);
if (replicationMetadataRecord != null) {
Expand Down
Loading
Loading