Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[server] Add threadsafe mode to venice-server which adjusts message processing order #910

Open
wants to merge 12 commits into
base: main
Choose a base branch
from

Conversation

ZacAttack
Copy link
Contributor

[server][WIP] Add threadsafe mode to venice-server which adjusts message processing order

This is an initial phase PR. It is seen as the minimal set of changes needed in order to add a mode where writes on leader are committed to rocksdb prior to producing. This change in order has the following impacts:

Drainer is skipped on leaders:
In a later refactor it might be prudent to remove the drainer entirely. However, in order to best accomodate that, it would likely make sense to execute a kind of batching logic when flushing to rocksdb. We do not attempt to make this change in this PR.

DCR logic must change:
Since writes are persisted to rocksdb prior to producing to Kafka, we now must accomodate for the possibility of left over state on a leader. To address this, we add a new mode to the merge conflict resolution logic where upon a perfect tie (on value and timestamp), we resolve to produce the repeated record. The intention here is to be able to be certain that a write which was persisted to rocksdb on leader but not produced doesn't end up getting lost due to failing DCR.

Transient Record is disabled
transient record cache is disabled for those ingestion tasks which enable this mode. This is itself was one of the goals, but we should go here with some validation. Most clusters in production end up seeing pretty low cache hit rate on transient record cache in production, however, there is at least one use case that gets as high as a 20% hit rate. Theoretically, we may be able to avoid taking too much hit here as we are able to give the memory savings to rocksdb cache, but this needs vetting. If this doesn't work, then we will need to replace the transient record cache with a simple size/time based cache.

There are also some cleanups here and there. Getting rid of some code paths that we no longer need and cleaning up others.

NOTE: Integration tests haven't been completely added to this PR yet. Part of that is because while switching some of the existing integration tests to this mode, some tests are failing. This needs some more diagnosis. Hence the WIP tag.

Resolves #XXX

How was this PR tested?

Does this PR introduce any user-facing changes?

  • No. You can skip the rest of this section.
  • Yes. Make sure to explain your proposed changes and call out the behavior change.

…age processing order

This is an initial phase PR.  It is seen as the minimal set of changes needed in order to add a mode where writes on leader are committed to rocksdb prior to producing.  This change in order has the following impacts:

-Drainer is skipped on leaders:
 In a later refactor it might be prudent to remove the drainer entirely.  However, in order to best accomodate that, it would likely make sense to execute a kind of batching logic when flushing to rocksdb.  We do not attempt to make this change in this PR.

-DCR logic must change
 Since writes are persisted to rocksdb prior to producing to Kafka, we now must accomodate for the possibility of left over state on a leader.  To address this, we add a new mode to the merge conflict resolution logic where upon a perfect tie (on value and timestamp), we resolve to produce the repeated record.  The intention here is to be able to be certain that a write which was persisted to rocksdb on leader but not produced doesn't end up getting lost due to failing DCR.

-Transient Record is disabled
 transient record cache is disabled for those ingestion tasks which enable this mode.  This is itself was one of the goals, but we should go here with some validation.  Most clusters in production end up seeing pretty low cache hit rate on transient record cache in production, however, there is at least one use case that gets as high as a 20% hit rate.  Theoretically, we may be able to avoid taking too much hit here as we are able to give the memory savings to rocksdb cache, but this needs vetting.  If this doesn't work, then we will need to replace the transient record cache with a simple size/time based cache.

There are also some cleanups here and there.  Getting rid of some code paths that we no longer need and cleaning up others.

NOTE: Integration tests haven't been completely added to this PR yet.  Part of that is because while switching some of the existing integration tests to this mode, some tests are failing.  This needs some more diagnosis.  Hence the WIP tag.
Copy link
Contributor

@sixpluszero sixpluszero left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the change! Overall looks great, I leave some comment, especially about TR.

Copy link
Contributor

@sixpluszero sixpluszero left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the update, now I am not sure if I totally follow the logic as I post my concern in one of the reply......we probably should chat a bit offline to chew the code change.
Also, we probably need some tests coverage to prove our old mode can fail in the race condition case and new mode is correct, even if my concern is eventually proved invalid...

@@ -127,6 +127,7 @@ public void setUp() {
Properties serverProperties = new Properties();
serverProperties.setProperty(ConfigKeys.SERVER_PROMOTION_TO_LEADER_REPLICA_DELAY_SECONDS, Long.toString(1));
serverProperties.put(ROCKSDB_PLAIN_TABLE_FORMAT_ENABLED, false);
serverProperties.put(SERVER_INGESTION_TASK_THREAD_SAFE_MODE, false);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this mode is default false, can we make it to run both true and false for some sophisticated tests for AAWC?

@ZacAttack ZacAttack changed the title [server][WIP] Add threadsafe mode to venice-server which adjusts message processing order [server] Add threadsafe mode to venice-server which adjusts message processing order Aug 1, 2024
@@ -2834,6 +2885,14 @@ private int internalProcessConsumerRecord(
LOGGER.error("Failed to record Record heartbeat with message: ", e);
}
} else {
// TODO: This is a hack. Today the code kind of does a backdoor change to a key in the leaderProducer callback.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually I don't really follow this hack, can you explain a bit? I remember the issue being the leader does not chunk the record before it is produced to VT, and it seems like here it does not solve the issue? So the Leader and the Follower will still have different view?

Copy link
Contributor

@FelixGV FelixGV left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for working on this! I wish I would have reviewed it sooner, but better late than never I guess... my comments are pretty minor, I think... but I have a few questions as well.

I don't see which integration tests are disabled. Is this a stale part of the commit message / PR body? Let's delete that part if it is stale...

Thanks again!

@@ -453,6 +454,8 @@ public class VeniceServerConfig extends VeniceClusterConfig {

private final int ingestionTaskMaxIdleCount;

private final boolean threadSafeMode;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should probably find another name for this functionality, since technically the old code is also supposed to be threadsafe... it's just that the new code is intended to make it easier to maintain thread-safety (less likely to introduce concurrency bugs)...

I guess the most significant functional change with this mode is that the leader persists changes locally prior to writing to Kafka, and as a result the TransientRecordCache becomes unnecessary. Perhaps a name along those lines might be more clear?

How about: leaderPersistsLocallyBeforeProducingToVT / leader.persists.locally.before.producing.to.vt

It's a bit of a mouthful, but seems more clear... a more concise version might be "Leader Persists Before Producing", and in day-to-day operations we might end up calling it "LPBP", for short. IDK, I'm just riffing at this point 😂

Open to other suggestions too, of course.

* The below flow must be executed in a critical session for the same key:
* The below flow must be executed in a critical section for the same key:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks 😄 ...

if (this.syncOffsetsOnlyAfterProducing && !chunkDeprecation) {
// sync offsets
ingestionTask
.maybeSyncOffsets(consumedRecord, leaderProducedRecordContext, partitionConsumptionState, subPartition);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does the PCS instance passed here has a chance of having been modified by another thread (the processing thread)? I imagine we don't clone the PCS in order to make them immutable, so I wonder if what we would be checkpointing here is guaranteed to represent the correct state, up until what was just produced, rather than up until what's been consumed by another thread?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a good catch, yeah it could happen.

/**
* The consumer record has been put into drainer queue; the following cases will result in putting to drainer directly:
* The consumer record needs to be put into drainer queue; the following cases will result in putting to drainer directly:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should delete the O/B/O reference below, right?

Comment on lines -6 to -8
import static com.linkedin.venice.schema.rmd.v1.CollectionRmdTimestamp.PUT_ONLY_PART_LENGTH_FIELD_POS;
import static com.linkedin.venice.schema.rmd.v1.CollectionRmdTimestamp.TOP_LEVEL_COLO_ID_FIELD_POS;
import static com.linkedin.venice.schema.rmd.v1.CollectionRmdTimestamp.TOP_LEVEL_TS_FIELD_POS;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nitpick: why are we removing those? It makes the code a bit more verbose, and it adds trivially modified lines into the git blame, which inflates the size of an already complex PR... is there any benefit? Perhaps there is ambiguity between the constants coming from different classes? (In which case I'd be fine with keeping the change...)

Comment on lines -30 to +34
final boolean newFieldCompletelyReplaceOldField = newFieldValue != oldFieldValue;
if (newFieldCompletelyReplaceOldField) {

if (newFieldValue != oldFieldValue) {
oldRecord.put(oldRecordField.pos(), newFieldValue);
}
return newFieldCompletelyReplaceOldField
? UpdateResultStatus.COMPLETELY_UPDATED
: UpdateResultStatus.NOT_UPDATED_AT_ALL;

return UpdateResultStatus.COMPLETELY_UPDATED;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So I guess this is the change referenced in the commit message about:

DCR logic must change:
Since writes are persisted to rocksdb prior to producing to Kafka, we now must accomodate for the possibility of left over state on a leader. To address this, we add a new mode to the merge conflict resolution logic where upon a perfect tie (on value and timestamp), we resolve to produce the repeated record. The intention here is to be able to be certain that a write which was persisted to rocksdb on leader but not produced doesn't end up getting lost due to failing DCR.

I don't understand this, however... why would there be changes persisted in the leader but not in the VT, in cases of a perfect tie? If it is a perfect tie, then I assume the leader need not persist any changes either, right...?

setNewMapActiveElementAndTs(
setNewMapActiveElementAndTimestamp(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nitpick... I would prefer trivial renames to be in a separate PR, to minimize the size of complex PRs... but up to you.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants