-
Notifications
You must be signed in to change notification settings - Fork 85
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[server] Add threadsafe mode to venice-server which adjusts message processing order #910
base: main
Are you sure you want to change the base?
Changes from 4 commits
7b7a328
7c8178c
0d4e6a0
88adca9
593cd0f
8c0e60c
3af747c
75633d7
d2e517e
fe74f0a
99451e1
0ce894d
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -178,7 +178,7 @@ protected DelegateConsumerRecordResult delegateConsumerRecord( | |
beforeProcessingBatchRecordsTimestampMs); | ||
} else { | ||
/** | ||
* The below flow must be executed in a critical session for the same key: | ||
* The below flow must be executed in a critical section for the same key: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks 😄 ... |
||
* Read existing value/RMD from transient record cache/disk -> perform DCR and decide incoming value wins | ||
* -> update transient record cache -> produce to VT (just call send, no need to wait for the produce future in the critical session) | ||
* | ||
|
@@ -499,7 +499,6 @@ protected void processMessageAndMaybeProduceToKafka( | |
.updateLatestIgnoredUpstreamRTOffset(kafkaClusterIdToUrlMap.get(kafkaClusterId), sourceOffset); | ||
} else { | ||
validatePostOperationResultsAndRecord(mergeConflictResult, offsetSumPreOperation, recordTimestampsPreOperation); | ||
|
||
// Apply this update to any views for this store | ||
// TODO: It'd be good to be able to do this in LeaderFollowerStoreIngestionTask instead, however, AA currently is | ||
// the | ||
|
@@ -1572,14 +1571,16 @@ protected LeaderProducerCallback createProducerCallback( | |
LeaderProducedRecordContext leaderProducedRecordContext, | ||
int subPartition, | ||
String kafkaUrl, | ||
long beforeProcessingRecordTimestampNs) { | ||
long beforeProcessingRecordTimestampNs, | ||
boolean syncOffsetsOnlyAfterProducing) { | ||
return new ActiveActiveProducerCallback( | ||
this, | ||
consumerRecord, | ||
partitionConsumptionState, | ||
leaderProducedRecordContext, | ||
subPartition, | ||
kafkaUrl, | ||
beforeProcessingRecordTimestampNs); | ||
beforeProcessingRecordTimestampNs, | ||
syncOffsetsOnlyAfterProducing); | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -55,14 +55,17 @@ public class LeaderProducerCallback implements ChunkAwareCallback { | |
protected ChunkedValueManifest oldValueManifest = null; | ||
protected ChunkedValueManifest oldRmdManifest = null; | ||
|
||
private final boolean syncOffsetsOnlyAfterProducing; | ||
|
||
public LeaderProducerCallback( | ||
LeaderFollowerStoreIngestionTask ingestionTask, | ||
PubSubMessage<KafkaKey, KafkaMessageEnvelope, Long> sourceConsumerRecord, | ||
PartitionConsumptionState partitionConsumptionState, | ||
LeaderProducedRecordContext leaderProducedRecordContext, | ||
int subPartition, | ||
String kafkaUrl, | ||
long beforeProcessingRecordTimestampNs) { | ||
long beforeProcessingRecordTimestampNs, | ||
boolean syncOffsetsOnlyAfterProducing) { | ||
this.ingestionTask = ingestionTask; | ||
this.sourceConsumerRecord = sourceConsumerRecord; | ||
this.partitionConsumptionState = partitionConsumptionState; | ||
|
@@ -71,6 +74,7 @@ public LeaderProducerCallback( | |
this.leaderProducedRecordContext = leaderProducedRecordContext; | ||
this.produceTimeNs = ingestionTask.isUserSystemStore() ? 0 : System.nanoTime(); | ||
this.beforeProcessingRecordTimestampNs = beforeProcessingRecordTimestampNs; | ||
this.syncOffsetsOnlyAfterProducing = syncOffsetsOnlyAfterProducing; | ||
} | ||
|
||
@Override | ||
|
@@ -156,7 +160,7 @@ public void onCompletion(PubSubProduceResult produceResult, Exception e) { | |
*/ | ||
if (chunkedValueManifest == null) { | ||
leaderProducedRecordContext.setProducedOffset(produceResult.getOffset()); | ||
ingestionTask.produceToStoreBufferService( | ||
produceToStoreBufferService( | ||
sourceConsumerRecord, | ||
leaderProducedRecordContext, | ||
subPartition, | ||
|
@@ -194,7 +198,7 @@ public void onCompletion(PubSubProduceResult produceResult, Exception e) { | |
manifestPut, | ||
leaderProducedRecordContext.getPersistedToDBFuture()); | ||
producedRecordForManifest.setProducedOffset(produceResult.getOffset()); | ||
ingestionTask.produceToStoreBufferService( | ||
produceToStoreBufferService( | ||
sourceConsumerRecord, | ||
producedRecordForManifest, | ||
subPartition, | ||
|
@@ -321,7 +325,7 @@ private long produceChunksToStoreBufferService( | |
LeaderProducedRecordContext producedRecordForChunk = | ||
LeaderProducedRecordContext.newChunkPutRecord(ByteUtils.extractByteArray(chunkKey), chunkPut); | ||
producedRecordForChunk.setProducedOffset(-1); | ||
ingestionTask.produceToStoreBufferService( | ||
produceToStoreBufferService( | ||
sourceConsumerRecord, | ||
producedRecordForChunk, | ||
subPartition, | ||
|
@@ -347,7 +351,7 @@ void produceDeprecatedChunkDeletionToStoreBufferService(ChunkedValueManifest man | |
LeaderProducedRecordContext producedRecordForChunk = | ||
LeaderProducedRecordContext.newChunkDeleteRecord(ByteUtils.extractByteArray(chunkKey), chunkDelete); | ||
producedRecordForChunk.setProducedOffset(-1); | ||
ingestionTask.produceToStoreBufferService( | ||
produceToStoreBufferService( | ||
sourceConsumerRecord, | ||
producedRecordForChunk, | ||
subPartition, | ||
|
@@ -357,6 +361,28 @@ void produceDeprecatedChunkDeletionToStoreBufferService(ChunkedValueManifest man | |
} | ||
} | ||
|
||
protected void produceToStoreBufferService( | ||
PubSubMessage<KafkaKey, KafkaMessageEnvelope, Long> consumedRecord, | ||
LeaderProducedRecordContext leaderProducedRecordContext, | ||
int subPartition, | ||
String kafkaUrl, | ||
long beforeProcessingRecordTimestampNs, | ||
long currentTimeForMetricsMs) throws InterruptedException { | ||
if (this.syncOffsetsOnlyAfterProducing) { | ||
// sync offsets | ||
ingestionTask | ||
.maybeSyncOffsets(consumedRecord, leaderProducedRecordContext, partitionConsumptionState, subPartition); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Does the PCS instance passed here has a chance of having been modified by another thread (the processing thread)? I imagine we don't clone the PCS in order to make them immutable, so I wonder if what we would be checkpointing here is guaranteed to represent the correct state, up until what was just produced, rather than up until what's been consumed by another thread? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. That's a good catch, yeah it could happen. |
||
} else { | ||
ingestionTask.produceToStoreBufferService( | ||
consumedRecord, | ||
leaderProducedRecordContext, | ||
subPartition, | ||
kafkaUrl, | ||
beforeProcessingRecordTimestampNs, | ||
currentTimeForMetricsMs); | ||
} | ||
} | ||
|
||
// Visible for VeniceWriter unit test. | ||
public PartitionConsumptionState getPartitionConsumptionState() { | ||
return partitionConsumptionState; | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should probably find another name for this functionality, since technically the old code is also supposed to be threadsafe... it's just that the new code is intended to make it easier to maintain thread-safety (less likely to introduce concurrency bugs)...
I guess the most significant functional change with this mode is that the leader persists changes locally prior to writing to Kafka, and as a result the
TransientRecordCache
becomes unnecessary. Perhaps a name along those lines might be more clear?How about:
leaderPersistsLocallyBeforeProducingToVT
/leader.persists.locally.before.producing.to.vt
It's a bit of a mouthful, but seems more clear... a more concise version might be "Leader Persists Before Producing", and in day-to-day operations we might end up calling it "LPBP", for short. IDK, I'm just riffing at this point 😂
Open to other suggestions too, of course.