From 7b7a328038d8b43487d8f21d098929b3478caaaf Mon Sep 17 00:00:00 2001 From: Zac Policzer Date: Wed, 20 Mar 2024 20:13:37 -0700 Subject: [PATCH 1/5] [server][WIP] Add threadsafe mode to venice-server which adjusts message processing order This is an initial phase PR. It is seen as the minimal set of changes needed in order to add a mode where writes on leader are committed to rocksdb prior to producing. This change in order has the following impacts: -Drainer is skipped on leaders: In a later refactor it might be prudent to remove the drainer entirely. However, in order to best accomodate that, it would likely make sense to execute a kind of batching logic when flushing to rocksdb. We do not attempt to make this change in this PR. -DCR logic must change Since writes are persisted to rocksdb prior to producing to Kafka, we now must accomodate for the possibility of left over state on a leader. To address this, we add a new mode to the merge conflict resolution logic where upon a perfect tie (on value and timestamp), we resolve to produce the repeated record. The intention here is to be able to be certain that a write which was persisted to rocksdb on leader but not produced doesn't end up getting lost due to failing DCR. -Transient Record is disabled transient record cache is disabled for those ingestion tasks which enable this mode. This is itself was one of the goals, but we should go here with some validation. Most clusters in production end up seeing pretty low cache hit rate on transient record cache in production, however, there is at least one use case that gets as high as a 20% hit rate. Theoretically, we may be able to avoid taking too much hit here as we are able to give the memory savings to rocksdb cache, but this needs vetting. If this doesn't work, then we will need to replace the transient record cache with a simple size/time based cache. There are also some cleanups here and there. Getting rid of some code paths that we no longer need and cleaning up others. NOTE: Integration tests haven't been completely added to this PR yet. Part of that is because while switching some of the existing integration tests to this mode, some tests are failing. This needs some more diagnosis. Hence the WIP tag. --- .../davinci/config/VeniceServerConfig.java | 140 +----------------- .../ActiveActiveProducerCallback.java | 6 +- .../ActiveActiveStoreIngestionTask.java | 9 +- .../LeaderFollowerStoreIngestionTask.java | 45 ++++-- .../consumer/LeaderProducerCallback.java | 36 ++++- .../consumer/PartitionConsumptionState.java | 14 +- .../kafka/consumer/StoreIngestionTask.java | 109 ++++++++++---- .../merge/MergeConflictResolver.java | 69 ++++++--- .../PerFieldTimestampMergeRecordHelper.java | 9 +- .../SortBasedCollectionFieldOpHandler.java | 71 ++++----- .../ActiveActiveStoreIngestionTaskTest.java | 2 +- .../consumer/LeaderProducerCallbackTest.java | 6 +- .../PartitionConsumptionStateTest.java | 6 +- .../StorageUtilizationManagerTest.java | 2 +- .../consumer/StoreIngestionTaskTest.java | 8 +- .../replication/merge/TestMergePut.java | 7 +- .../replication/merge/TestMergeUpdate.java | 7 +- .../java/com/linkedin/venice/ConfigKeys.java | 2 + .../endToEnd/TestActiveActiveIngestion.java | 1 + .../StorageReadRequestHandlerTest.java | 1 + 20 files changed, 294 insertions(+), 256 deletions(-) diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/config/VeniceServerConfig.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/config/VeniceServerConfig.java index 2e1ddd0808..03d3509e91 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/config/VeniceServerConfig.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/config/VeniceServerConfig.java @@ -2,138 +2,7 @@ import static com.linkedin.davinci.ingestion.utils.IsolatedIngestionUtils.INGESTION_ISOLATION_CONFIG_PREFIX; import static com.linkedin.davinci.store.rocksdb.RocksDBServerConfig.ROCKSDB_TOTAL_MEMTABLE_USAGE_CAP_IN_BYTES; -import static com.linkedin.venice.ConfigKeys.AUTOCREATE_DATA_PATH; -import static com.linkedin.venice.ConfigKeys.DATA_BASE_PATH; -import static com.linkedin.venice.ConfigKeys.DIV_PRODUCER_STATE_MAX_AGE_MS; -import static com.linkedin.venice.ConfigKeys.ENABLE_GRPC_READ_SERVER; -import static com.linkedin.venice.ConfigKeys.ENABLE_SERVER_ALLOW_LIST; -import static com.linkedin.venice.ConfigKeys.FAST_AVRO_FIELD_LIMIT_PER_METHOD; -import static com.linkedin.venice.ConfigKeys.FREEZE_INGESTION_IF_READY_TO_SERVE_OR_LOCAL_DATA_EXISTS; -import static com.linkedin.venice.ConfigKeys.GRPC_READ_SERVER_PORT; -import static com.linkedin.venice.ConfigKeys.GRPC_SERVER_WORKER_THREAD_COUNT; -import static com.linkedin.venice.ConfigKeys.HELIX_HYBRID_STORE_QUOTA_ENABLED; -import static com.linkedin.venice.ConfigKeys.HYBRID_QUOTA_ENFORCEMENT_ENABLED; -import static com.linkedin.venice.ConfigKeys.INGESTION_MEMORY_LIMIT; -import static com.linkedin.venice.ConfigKeys.INGESTION_MEMORY_LIMIT_STORE_LIST; -import static com.linkedin.venice.ConfigKeys.INGESTION_MLOCK_ENABLED; -import static com.linkedin.venice.ConfigKeys.INGESTION_USE_DA_VINCI_CLIENT; -import static com.linkedin.venice.ConfigKeys.KAFKA_ADMIN_CLASS; -import static com.linkedin.venice.ConfigKeys.KAFKA_PRODUCER_METRICS; -import static com.linkedin.venice.ConfigKeys.KAFKA_READ_ONLY_ADMIN_CLASS; -import static com.linkedin.venice.ConfigKeys.KAFKA_WRITE_ONLY_ADMIN_CLASS; -import static com.linkedin.venice.ConfigKeys.KEY_VALUE_PROFILING_ENABLED; -import static com.linkedin.venice.ConfigKeys.KME_REGISTRATION_FROM_MESSAGE_HEADER_ENABLED; -import static com.linkedin.venice.ConfigKeys.LEADER_FOLLOWER_STATE_TRANSITION_THREAD_POOL_STRATEGY; -import static com.linkedin.venice.ConfigKeys.LISTENER_HOSTNAME; -import static com.linkedin.venice.ConfigKeys.LISTENER_PORT; -import static com.linkedin.venice.ConfigKeys.LOCAL_CONTROLLER_D2_SERVICE_NAME; -import static com.linkedin.venice.ConfigKeys.LOCAL_CONTROLLER_URL; -import static com.linkedin.venice.ConfigKeys.LOCAL_D2_ZK_HOST; -import static com.linkedin.venice.ConfigKeys.MAX_FUTURE_VERSION_LEADER_FOLLOWER_STATE_TRANSITION_THREAD_NUMBER; -import static com.linkedin.venice.ConfigKeys.MAX_LEADER_FOLLOWER_STATE_TRANSITION_THREAD_NUMBER; -import static com.linkedin.venice.ConfigKeys.META_STORE_WRITER_CLOSE_CONCURRENCY; -import static com.linkedin.venice.ConfigKeys.META_STORE_WRITER_CLOSE_TIMEOUT_MS; -import static com.linkedin.venice.ConfigKeys.MIN_CONSUMER_IN_CONSUMER_POOL_PER_KAFKA_CLUSTER; -import static com.linkedin.venice.ConfigKeys.OFFSET_LAG_DELTA_RELAX_FACTOR_FOR_FAST_ONLINE_TRANSITION_IN_RESTART; -import static com.linkedin.venice.ConfigKeys.PARTICIPANT_MESSAGE_CONSUMPTION_DELAY_MS; -import static com.linkedin.venice.ConfigKeys.PUBSUB_TOPIC_MANAGER_METADATA_FETCHER_CONSUMER_POOL_SIZE; -import static com.linkedin.venice.ConfigKeys.PUBSUB_TOPIC_MANAGER_METADATA_FETCHER_THREAD_POOL_SIZE; -import static com.linkedin.venice.ConfigKeys.PUB_SUB_ADMIN_ADAPTER_FACTORY_CLASS; -import static com.linkedin.venice.ConfigKeys.PUB_SUB_CONSUMER_ADAPTER_FACTORY_CLASS; -import static com.linkedin.venice.ConfigKeys.PUB_SUB_PRODUCER_ADAPTER_FACTORY_CLASS; -import static com.linkedin.venice.ConfigKeys.ROUTER_PRINCIPAL_NAME; -import static com.linkedin.venice.ConfigKeys.SERVER_BLOCKING_QUEUE_TYPE; -import static com.linkedin.venice.ConfigKeys.SERVER_COMPUTE_FAST_AVRO_ENABLED; -import static com.linkedin.venice.ConfigKeys.SERVER_COMPUTE_QUEUE_CAPACITY; -import static com.linkedin.venice.ConfigKeys.SERVER_COMPUTE_THREAD_NUM; -import static com.linkedin.venice.ConfigKeys.SERVER_CONSUMER_POOL_SIZE_PER_KAFKA_CLUSTER; -import static com.linkedin.venice.ConfigKeys.SERVER_DATABASE_CHECKSUM_VERIFICATION_ENABLED; -import static com.linkedin.venice.ConfigKeys.SERVER_DATABASE_LOOKUP_QUEUE_CAPACITY; -import static com.linkedin.venice.ConfigKeys.SERVER_DATABASE_MEMORY_STATS_ENABLED; -import static com.linkedin.venice.ConfigKeys.SERVER_DATABASE_SYNC_BYTES_INTERNAL_FOR_DEFERRED_WRITE_MODE; -import static com.linkedin.venice.ConfigKeys.SERVER_DATABASE_SYNC_BYTES_INTERNAL_FOR_TRANSACTIONAL_MODE; -import static com.linkedin.venice.ConfigKeys.SERVER_DB_READ_ONLY_FOR_BATCH_ONLY_STORE_ENABLED; -import static com.linkedin.venice.ConfigKeys.SERVER_DEBUG_LOGGING_ENABLED; -import static com.linkedin.venice.ConfigKeys.SERVER_DEDICATED_CONSUMER_POOL_FOR_AA_WC_LEADER_ENABLED; -import static com.linkedin.venice.ConfigKeys.SERVER_DEDICATED_CONSUMER_POOL_SIZE_FOR_AA_WC_LEADER; -import static com.linkedin.venice.ConfigKeys.SERVER_DEDICATED_DRAINER_FOR_SORTED_INPUT_ENABLED; -import static com.linkedin.venice.ConfigKeys.SERVER_DISK_FULL_THRESHOLD; -import static com.linkedin.venice.ConfigKeys.SERVER_DISK_HEALTH_CHECK_INTERVAL_IN_SECONDS; -import static com.linkedin.venice.ConfigKeys.SERVER_DISK_HEALTH_CHECK_SERVICE_ENABLED; -import static com.linkedin.venice.ConfigKeys.SERVER_DISK_HEALTH_CHECK_TIMEOUT_IN_SECONDS; -import static com.linkedin.venice.ConfigKeys.SERVER_ENABLE_LIVE_CONFIG_BASED_KAFKA_THROTTLING; -import static com.linkedin.venice.ConfigKeys.SERVER_ENABLE_PARALLEL_BATCH_GET; -import static com.linkedin.venice.ConfigKeys.SERVER_FORKED_PROCESS_JVM_ARGUMENT_LIST; -import static com.linkedin.venice.ConfigKeys.SERVER_HTTP2_HEADER_TABLE_SIZE; -import static com.linkedin.venice.ConfigKeys.SERVER_HTTP2_INBOUND_ENABLED; -import static com.linkedin.venice.ConfigKeys.SERVER_HTTP2_INITIAL_WINDOW_SIZE; -import static com.linkedin.venice.ConfigKeys.SERVER_HTTP2_MAX_CONCURRENT_STREAMS; -import static com.linkedin.venice.ConfigKeys.SERVER_HTTP2_MAX_FRAME_SIZE; -import static com.linkedin.venice.ConfigKeys.SERVER_HTTP2_MAX_HEADER_LIST_SIZE; -import static com.linkedin.venice.ConfigKeys.SERVER_INGESTION_CHECKPOINT_DURING_GRACEFUL_SHUTDOWN_ENABLED; -import static com.linkedin.venice.ConfigKeys.SERVER_INGESTION_HEARTBEAT_INTERVAL_MS; -import static com.linkedin.venice.ConfigKeys.SERVER_INGESTION_ISOLATION_APPLICATION_PORT; -import static com.linkedin.venice.ConfigKeys.SERVER_INGESTION_ISOLATION_SERVICE_PORT; -import static com.linkedin.venice.ConfigKeys.SERVER_INGESTION_MODE; -import static com.linkedin.venice.ConfigKeys.SERVER_INGESTION_TASK_MAX_IDLE_COUNT; -import static com.linkedin.venice.ConfigKeys.SERVER_KAFKA_CONSUMER_OFFSET_COLLECTION_ENABLED; -import static com.linkedin.venice.ConfigKeys.SERVER_KAFKA_MAX_POLL_RECORDS; -import static com.linkedin.venice.ConfigKeys.SERVER_KAFKA_PRODUCER_POOL_SIZE_PER_KAFKA_CLUSTER; -import static com.linkedin.venice.ConfigKeys.SERVER_LEADER_COMPLETE_STATE_CHECK_IN_FOLLOWER_ENABLED; -import static com.linkedin.venice.ConfigKeys.SERVER_LEADER_COMPLETE_STATE_CHECK_IN_FOLLOWER_VALID_INTERVAL_MS; -import static com.linkedin.venice.ConfigKeys.SERVER_LEAKED_RESOURCE_CLEANUP_ENABLED; -import static com.linkedin.venice.ConfigKeys.SERVER_LEAKED_RESOURCE_CLEAN_UP_INTERVAL_IN_MINUTES; -import static com.linkedin.venice.ConfigKeys.SERVER_LOCAL_CONSUMER_CONFIG_PREFIX; -import static com.linkedin.venice.ConfigKeys.SERVER_MAX_REQUEST_SIZE; -import static com.linkedin.venice.ConfigKeys.SERVER_MAX_WAIT_FOR_VERSION_INFO_MS_CONFIG; -import static com.linkedin.venice.ConfigKeys.SERVER_NETTY_GRACEFUL_SHUTDOWN_PERIOD_SECONDS; -import static com.linkedin.venice.ConfigKeys.SERVER_NETTY_IDLE_TIME_SECONDS; -import static com.linkedin.venice.ConfigKeys.SERVER_NETTY_WORKER_THREADS; -import static com.linkedin.venice.ConfigKeys.SERVER_NODE_CAPACITY_RCU; -import static com.linkedin.venice.ConfigKeys.SERVER_NON_EXISTING_TOPIC_CHECK_RETRY_INTERNAL_SECOND; -import static com.linkedin.venice.ConfigKeys.SERVER_NON_EXISTING_TOPIC_INGESTION_TASK_KILL_THRESHOLD_SECOND; -import static com.linkedin.venice.ConfigKeys.SERVER_NUM_SCHEMA_FAST_CLASS_WARMUP; -import static com.linkedin.venice.ConfigKeys.SERVER_OPTIMIZE_DATABASE_FOR_BACKUP_VERSION_ENABLED; -import static com.linkedin.venice.ConfigKeys.SERVER_OPTIMIZE_DATABASE_FOR_BACKUP_VERSION_NO_READ_THRESHOLD_SECONDS; -import static com.linkedin.venice.ConfigKeys.SERVER_OPTIMIZE_DATABASE_SERVICE_SCHEDULE_INTERNAL_SECONDS; -import static com.linkedin.venice.ConfigKeys.SERVER_PARALLEL_BATCH_GET_CHUNK_SIZE; -import static com.linkedin.venice.ConfigKeys.SERVER_PARTITION_GRACEFUL_DROP_DELAY_IN_SECONDS; -import static com.linkedin.venice.ConfigKeys.SERVER_PROMOTION_TO_LEADER_REPLICA_DELAY_SECONDS; -import static com.linkedin.venice.ConfigKeys.SERVER_PUBSUB_CONSUMER_POLL_RETRY_BACKOFF_MS; -import static com.linkedin.venice.ConfigKeys.SERVER_PUBSUB_CONSUMER_POLL_RETRY_TIMES; -import static com.linkedin.venice.ConfigKeys.SERVER_QUOTA_ENFORCEMENT_ENABLED; -import static com.linkedin.venice.ConfigKeys.SERVER_REMOTE_CONSUMER_CONFIG_PREFIX; -import static com.linkedin.venice.ConfigKeys.SERVER_REMOTE_INGESTION_REPAIR_SLEEP_INTERVAL_SECONDS; -import static com.linkedin.venice.ConfigKeys.SERVER_REST_SERVICE_EPOLL_ENABLED; -import static com.linkedin.venice.ConfigKeys.SERVER_REST_SERVICE_STORAGE_THREAD_NUM; -import static com.linkedin.venice.ConfigKeys.SERVER_ROCKSDB_STORAGE_CONFIG_CHECK_ENABLED; -import static com.linkedin.venice.ConfigKeys.SERVER_ROUTER_CONNECTION_WARMING_DELAY_MS; -import static com.linkedin.venice.ConfigKeys.SERVER_SCHEMA_FAST_CLASS_WARMUP_TIMEOUT; -import static com.linkedin.venice.ConfigKeys.SERVER_SCHEMA_PRESENCE_CHECK_ENABLED; -import static com.linkedin.venice.ConfigKeys.SERVER_SHARED_CONSUMER_ASSIGNMENT_STRATEGY; -import static com.linkedin.venice.ConfigKeys.SERVER_SHARED_CONSUMER_NON_EXISTING_TOPIC_CLEANUP_DELAY_MS; -import static com.linkedin.venice.ConfigKeys.SERVER_SHARED_KAFKA_PRODUCER_ENABLED; -import static com.linkedin.venice.ConfigKeys.SERVER_SHUTDOWN_DISK_UNHEALTHY_TIME_MS; -import static com.linkedin.venice.ConfigKeys.SERVER_SOURCE_TOPIC_OFFSET_CHECK_INTERVAL_MS; -import static com.linkedin.venice.ConfigKeys.SERVER_SSL_HANDSHAKE_QUEUE_CAPACITY; -import static com.linkedin.venice.ConfigKeys.SERVER_SSL_HANDSHAKE_THREAD_POOL_SIZE; -import static com.linkedin.venice.ConfigKeys.SERVER_STOP_CONSUMPTION_TIMEOUT_IN_SECONDS; -import static com.linkedin.venice.ConfigKeys.SERVER_STORE_TO_EARLY_TERMINATION_THRESHOLD_MS_MAP; -import static com.linkedin.venice.ConfigKeys.SERVER_STUCK_CONSUMER_REPAIR_ENABLED; -import static com.linkedin.venice.ConfigKeys.SERVER_STUCK_CONSUMER_REPAIR_INTERVAL_SECOND; -import static com.linkedin.venice.ConfigKeys.SERVER_STUCK_CONSUMER_REPAIR_THRESHOLD_SECOND; -import static com.linkedin.venice.ConfigKeys.SERVER_SYSTEM_STORE_PROMOTION_TO_LEADER_REPLICA_DELAY_SECONDS; -import static com.linkedin.venice.ConfigKeys.SERVER_UNSUB_AFTER_BATCHPUSH; -import static com.linkedin.venice.ConfigKeys.SEVER_CALCULATE_QUOTA_USAGE_BASED_ON_PARTITIONS_ASSIGNMENT_ENABLED; -import static com.linkedin.venice.ConfigKeys.SORTED_INPUT_DRAINER_SIZE; -import static com.linkedin.venice.ConfigKeys.STORE_WRITER_BUFFER_AFTER_LEADER_LOGIC_ENABLED; -import static com.linkedin.venice.ConfigKeys.STORE_WRITER_BUFFER_MEMORY_CAPACITY; -import static com.linkedin.venice.ConfigKeys.STORE_WRITER_BUFFER_NOTIFY_DELTA; -import static com.linkedin.venice.ConfigKeys.STORE_WRITER_NUMBER; -import static com.linkedin.venice.ConfigKeys.SYSTEM_SCHEMA_CLUSTER_NAME; -import static com.linkedin.venice.ConfigKeys.SYSTEM_SCHEMA_INITIALIZATION_AT_START_TIME_ENABLED; -import static com.linkedin.venice.ConfigKeys.UNREGISTER_METRIC_FOR_DELETED_STORE_ENABLED; -import static com.linkedin.venice.ConfigKeys.UNSORTED_INPUT_DRAINER_SIZE; +import static com.linkedin.venice.ConfigKeys.*; import static com.linkedin.venice.pubsub.PubSubConstants.PUBSUB_TOPIC_MANAGER_METADATA_FETCHER_CONSUMER_POOL_SIZE_DEFAULT_VALUE; import com.linkedin.davinci.helix.LeaderFollowerPartitionStateModelFactory; @@ -458,6 +327,8 @@ public class VeniceServerConfig extends VeniceClusterConfig { private final int ingestionTaskMaxIdleCount; + private final boolean threadSafeMode; + private final long metaStoreWriterCloseTimeoutInMS; private final int metaStoreWriterCloseConcurrency; @@ -763,6 +634,7 @@ public VeniceServerConfig(VeniceProperties serverProperties, Map perform DCR and decide incoming value wins * -> update transient record cache -> produce to VT (just call send, no need to wait for the produce future in the critical session) * @@ -499,7 +499,6 @@ protected void processMessageAndMaybeProduceToKafka( .updateLatestIgnoredUpstreamRTOffset(kafkaClusterIdToUrlMap.get(kafkaClusterId), sourceOffset); } else { validatePostOperationResultsAndRecord(mergeConflictResult, offsetSumPreOperation, recordTimestampsPreOperation); - // Apply this update to any views for this store // TODO: It'd be good to be able to do this in LeaderFollowerStoreIngestionTask instead, however, AA currently is // the @@ -1578,7 +1577,8 @@ protected LeaderProducerCallback createProducerCallback( LeaderProducedRecordContext leaderProducedRecordContext, int subPartition, String kafkaUrl, - long beforeProcessingRecordTimestampNs) { + long beforeProcessingRecordTimestampNs, + boolean syncOffsetsOnlyAfterProducing) { return new ActiveActiveProducerCallback( this, consumerRecord, @@ -1586,6 +1586,7 @@ protected LeaderProducerCallback createProducerCallback( leaderProducedRecordContext, subPartition, kafkaUrl, - beforeProcessingRecordTimestampNs); + beforeProcessingRecordTimestampNs, + syncOffsetsOnlyAfterProducing); } } diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/LeaderFollowerStoreIngestionTask.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/LeaderFollowerStoreIngestionTask.java index a4e8716c34..bf360541cd 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/LeaderFollowerStoreIngestionTask.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/LeaderFollowerStoreIngestionTask.java @@ -1582,13 +1582,27 @@ protected void produceToLocalKafka( String kafkaUrl, int kafkaClusterId, long beforeProcessingRecordTimestampNs) { + + if (this.runInThreadSafeMode) { + // Write to rocksdb. At time of writing, this is the last step after a huge amount of processing and compression + // and whatnot. At this stage we do not sync the offset, instead doing that after successfully produce. + this.processConsumerRecord( + consumerRecord, + leaderProducedRecordContext, + subPartition, + kafkaUrl, + beforeProcessingRecordTimestampNs, + false); + } + LeaderProducerCallback callback = createProducerCallback( consumerRecord, partitionConsumptionState, leaderProducedRecordContext, subPartition, kafkaUrl, - beforeProcessingRecordTimestampNs); + beforeProcessingRecordTimestampNs, + this.runInThreadSafeMode); long sourceTopicOffset = consumerRecord.getOffset(); LeaderMetadataWrapper leaderMetadataWrapper = new LeaderMetadataWrapper(sourceTopicOffset, kafkaClusterId); partitionConsumptionState.setLastLeaderPersistFuture(leaderProducedRecordContext.getPersistedToDBFuture()); @@ -2076,7 +2090,8 @@ private void propagateHeartbeatFromUpstreamTopicToLocalVersionTopic( leaderProducedRecordContext, partition, kafkaUrl, - beforeProcessingRecordTimestampNs); + beforeProcessingRecordTimestampNs, + this.runInThreadSafeMode); LeaderMetadataWrapper leaderMetadataWrapper = new LeaderMetadataWrapper(consumerRecord.getOffset(), kafkaClusterId); List subPartitions = PartitionUtils.getSubPartitions(partitionConsumptionState.getUserPartition(), amplificationFactor); @@ -2143,7 +2158,7 @@ protected void recordHeartbeatReceived( * This function should be called as one of the first steps in processing pipeline for all messages consumed from any kafka topic. * * The caller of this function should only process this {@param consumerRecord} further if the return is - * {@link DelegateConsumerRecordResult#QUEUED_TO_DRAINER}. + * {@link DelegateConsumerRecordResult#QUEUE_TO_DRAINER}. * * This function assumes {@link #shouldProcessRecord(PubSubMessage, int)} has been called which happens in * {@link StoreIngestionTask#produceToStoreBufferServiceOrKafka(Iterable, PubSubTopicPartition, String, int)} @@ -2165,7 +2180,6 @@ protected DelegateConsumerRecordResult delegateConsumerRecord( int kafkaClusterId, long beforeProcessingRecordTimestampNs, long currentTimeForMetricsMs) { - boolean produceToLocalKafka = false; try { KafkaKey kafkaKey = consumerRecord.getKey(); KafkaMessageEnvelope kafkaValue = consumerRecord.getValue(); @@ -2181,9 +2195,9 @@ protected DelegateConsumerRecordResult delegateConsumerRecord( PartitionConsumptionState partitionConsumptionState = partitionConsumptionStateMap.get(subPartition); if (partitionConsumptionState == null) { // The partition is likely unsubscribed, will skip these messages. - return DelegateConsumerRecordResult.SKIPPED_MESSAGE; + return DelegateConsumerRecordResult.END_PROCESSING; } - produceToLocalKafka = shouldProduceToVersionTopic(partitionConsumptionState); + boolean produceToLocalKafka = shouldProduceToVersionTopic(partitionConsumptionState); // UPDATE message is only expected in LEADER which must be produced to kafka. MessageType msgType = MessageType.valueOf(kafkaValue); if (msgType == MessageType.UPDATE && !produceToLocalKafka) { @@ -2205,7 +2219,10 @@ protected DelegateConsumerRecordResult delegateConsumerRecord( * (i) it's a follower or (ii) leader is consuming from VT */ if (!produceToLocalKafka) { - return DelegateConsumerRecordResult.QUEUED_TO_DRAINER; + // TODO: The next step will put in the drainer queue. When threadsafe mode is enabled, it means we skip + // the drainer during rt consumption and commit straight to rocksdb. To remove drainer completely, + // we should do the same here + return DelegateConsumerRecordResult.QUEUE_TO_DRAINER; } // If we are here the message must be produced to local kafka or silently consumed. @@ -2249,7 +2266,7 @@ protected DelegateConsumerRecordResult delegateConsumerRecord( */ divErrorMetricCallback.accept(e); LOGGER.debug("{} : Skipping a duplicate record at offset: {}", ingestionTaskName, consumerRecord.getOffset()); - return DelegateConsumerRecordResult.DUPLICATE_MESSAGE; + return DelegateConsumerRecordResult.END_PROCESSING; } if (kafkaKey.isControlMessage()) { @@ -2405,7 +2422,7 @@ protected DelegateConsumerRecordResult delegateConsumerRecord( if (isDataRecovery && !partitionConsumptionState.isBatchOnly()) { // Ignore remote VT's TS message since we might need to consume more RT or incremental push data from VT // that's no longer in the local/remote RT due to retention. - return DelegateConsumerRecordResult.SKIPPED_MESSAGE; + return DelegateConsumerRecordResult.END_PROCESSING; } leaderProducedRecordContext = LeaderProducedRecordContext.newControlMessageRecord(kafkaKey.getKey(), controlMessage); @@ -2426,7 +2443,7 @@ protected DelegateConsumerRecordResult delegateConsumerRecord( beforeProcessingRecordTimestampNs); break; case VERSION_SWAP: - return DelegateConsumerRecordResult.QUEUED_TO_DRAINER; + return DelegateConsumerRecordResult.QUEUE_TO_DRAINER; default: // do nothing break; @@ -2456,7 +2473,7 @@ protected DelegateConsumerRecordResult delegateConsumerRecord( beforeProcessingRecordTimestampNs, currentTimeForMetricsMs); } - return DelegateConsumerRecordResult.PRODUCED_TO_KAFKA; + return DelegateConsumerRecordResult.END_PROCESSING; } catch (Exception e) { throw new VeniceException( ingestionTaskName + " hasProducedToKafka: exception for message received from: " @@ -3396,7 +3413,8 @@ protected LeaderProducerCallback createProducerCallback( LeaderProducedRecordContext leaderProducedRecordContext, int subPartition, String kafkaUrl, - long beforeProcessingRecordTimestampNs) { + long beforeProcessingRecordTimestampNs, + boolean syncOffsetsOnlyAfterProducing) { return new LeaderProducerCallback( this, consumerRecord, @@ -3404,7 +3422,8 @@ protected LeaderProducerCallback createProducerCallback( leaderProducedRecordContext, subPartition, kafkaUrl, - beforeProcessingRecordTimestampNs); + beforeProcessingRecordTimestampNs, + syncOffsetsOnlyAfterProducing); } protected Lazy> getVeniceWriter() { diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/LeaderProducerCallback.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/LeaderProducerCallback.java index 8a8f654108..4490278762 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/LeaderProducerCallback.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/LeaderProducerCallback.java @@ -55,6 +55,8 @@ public class LeaderProducerCallback implements ChunkAwareCallback { protected ChunkedValueManifest oldValueManifest = null; protected ChunkedValueManifest oldRmdManifest = null; + private final boolean syncOffsetsOnlyAfterProducing; + public LeaderProducerCallback( LeaderFollowerStoreIngestionTask ingestionTask, PubSubMessage sourceConsumerRecord, @@ -62,7 +64,8 @@ public LeaderProducerCallback( LeaderProducedRecordContext leaderProducedRecordContext, int subPartition, String kafkaUrl, - long beforeProcessingRecordTimestampNs) { + long beforeProcessingRecordTimestampNs, + boolean syncOffsetsOnlyAfterProducing) { this.ingestionTask = ingestionTask; this.sourceConsumerRecord = sourceConsumerRecord; this.partitionConsumptionState = partitionConsumptionState; @@ -71,6 +74,7 @@ public LeaderProducerCallback( this.leaderProducedRecordContext = leaderProducedRecordContext; this.produceTimeNs = ingestionTask.isUserSystemStore() ? 0 : System.nanoTime(); this.beforeProcessingRecordTimestampNs = beforeProcessingRecordTimestampNs; + this.syncOffsetsOnlyAfterProducing = syncOffsetsOnlyAfterProducing; } @Override @@ -156,7 +160,7 @@ public void onCompletion(PubSubProduceResult produceResult, Exception e) { */ if (chunkedValueManifest == null) { leaderProducedRecordContext.setProducedOffset(produceResult.getOffset()); - ingestionTask.produceToStoreBufferService( + produceToStoreBufferService( sourceConsumerRecord, leaderProducedRecordContext, subPartition, @@ -194,7 +198,7 @@ public void onCompletion(PubSubProduceResult produceResult, Exception e) { manifestPut, leaderProducedRecordContext.getPersistedToDBFuture()); producedRecordForManifest.setProducedOffset(produceResult.getOffset()); - ingestionTask.produceToStoreBufferService( + produceToStoreBufferService( sourceConsumerRecord, producedRecordForManifest, subPartition, @@ -314,7 +318,7 @@ private long produceChunksToStoreBufferService( LeaderProducedRecordContext producedRecordForChunk = LeaderProducedRecordContext.newChunkPutRecord(ByteUtils.extractByteArray(chunkKey), chunkPut); producedRecordForChunk.setProducedOffset(-1); - ingestionTask.produceToStoreBufferService( + produceToStoreBufferService( sourceConsumerRecord, producedRecordForChunk, subPartition, @@ -340,7 +344,7 @@ void produceDeprecatedChunkDeletionToStoreBufferService(ChunkedValueManifest man LeaderProducedRecordContext producedRecordForChunk = LeaderProducedRecordContext.newChunkDeleteRecord(ByteUtils.extractByteArray(chunkKey), chunkDelete); producedRecordForChunk.setProducedOffset(-1); - ingestionTask.produceToStoreBufferService( + produceToStoreBufferService( sourceConsumerRecord, producedRecordForChunk, subPartition, @@ -350,6 +354,28 @@ void produceDeprecatedChunkDeletionToStoreBufferService(ChunkedValueManifest man } } + protected void produceToStoreBufferService( + PubSubMessage consumedRecord, + LeaderProducedRecordContext leaderProducedRecordContext, + int subPartition, + String kafkaUrl, + long beforeProcessingRecordTimestampNs, + long currentTimeForMetricsMs) throws InterruptedException { + if (this.syncOffsetsOnlyAfterProducing) { + // sync offsets + ingestionTask + .maybeSyncOffsets(consumedRecord, leaderProducedRecordContext, partitionConsumptionState, subPartition); + } else { + ingestionTask.produceToStoreBufferService( + consumedRecord, + leaderProducedRecordContext, + subPartition, + kafkaUrl, + beforeProcessingRecordTimestampNs, + currentTimeForMetricsMs); + } + } + // Visible for VeniceWriter unit test. public PartitionConsumptionState getPartitionConsumptionState() { return partitionConsumptionState; diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/PartitionConsumptionState.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/PartitionConsumptionState.java index 1df4beae7e..81c33eaff7 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/PartitionConsumptionState.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/PartitionConsumptionState.java @@ -217,7 +217,14 @@ public class PartitionConsumptionState { */ private boolean firstHeartBeatSOSReceived; - public PartitionConsumptionState(int partition, int amplificationFactor, OffsetRecord offsetRecord, boolean hybrid) { + private boolean threadSafeMode; + + public PartitionConsumptionState( + int partition, + int amplificationFactor, + OffsetRecord offsetRecord, + boolean hybrid, + boolean threadSafeMode) { this.partition = partition; this.amplificationFactor = amplificationFactor; this.userPartition = PartitionUtils.getUserPartition(partition, amplificationFactor); @@ -230,6 +237,7 @@ public PartitionConsumptionState(int partition, int amplificationFactor, OffsetR this.processedRecordSizeSinceLastSync = 0; this.leaderFollowerState = LeaderFollowerStateType.STANDBY; this.expectedSSTFileChecksum = null; + this.threadSafeMode = threadSafeMode; /** * Initialize the latest consumed time with current time; otherwise, it's 0 by default * and leader will be promoted immediately. @@ -546,6 +554,10 @@ public void setTransientRecord( int valueLen, int valueSchemaId, GenericRecord replicationMetadataRecord) { + if (this.threadSafeMode) { + // NoOp + return; + } TransientRecord transientRecord = new TransientRecord(value, valueOffset, valueLen, valueSchemaId, kafkaClusterId, kafkaConsumedOffset); if (replicationMetadataRecord != null) { diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java index 4f5c17b81a..20583bf36e 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java @@ -323,6 +323,8 @@ public abstract class StoreIngestionTask implements Runnable, Closeable { private final String[] msgForLagMeasurement; private final Runnable runnableForKillIngestionTasksForNonCurrentVersions; + protected final boolean runInThreadSafeMode; + public StoreIngestionTask( StoreIngestionTaskFactory.Builder builder, Store store, @@ -470,6 +472,7 @@ public StoreIngestionTask( this.runnableForKillIngestionTasksForNonCurrentVersions = builder.getRunnableForKillIngestionTasksForNonCurrentVersions(); this.ingestionTaskMaxIdleCount = serverConfig.getIngestionTaskMaxIdleCount(); + this.runInThreadSafeMode = serverConfig.isThreadSafeMode(); } /** Package-private on purpose, only intended for tests. Do not use for production use cases. */ @@ -1040,6 +1043,8 @@ protected void produceToStoreBufferServiceOrKafka( int subPartition = PartitionUtils.getSubPartition(topicPartition, amplificationFactor); boolean metricsEnabled = emitMetrics.get(); long currentTimeForMetricsMs = System.currentTimeMillis(); + + // Loop through all polled messages and process for (PubSubMessage record: records) { long beforeProcessingRecordTimestampNs = System.nanoTime(); PartitionConsumptionState partitionConsumptionState = partitionConsumptionStateMap.get(subPartition); @@ -1052,7 +1057,6 @@ protected void produceToStoreBufferServiceOrKafka( } continue; } - if (record.getKey().isControlMessage()) { ControlMessage controlMessage = (ControlMessage) record.getValue().payloadUnion; if (ControlMessageType.valueOf(controlMessage.controlMessageType) == ControlMessageType.START_OF_PUSH) { @@ -1084,7 +1088,7 @@ protected void produceToStoreBufferServiceOrKafka( beforeProcessingRecordTimestampNs, currentTimeForMetricsMs); switch (delegateConsumerRecordResult) { - case QUEUED_TO_DRAINER: + case QUEUE_TO_DRAINER: long queuePutStartTimeInNS = metricsEnabled ? System.nanoTime() : 0; // blocking call @@ -1095,9 +1099,17 @@ protected void produceToStoreBufferServiceOrKafka( elapsedTimeForPuttingIntoQueue += LatencyUtils.getLatencyInMS(queuePutStartTimeInNS); } break; - case PRODUCED_TO_KAFKA: - case SKIPPED_MESSAGE: - case DUPLICATE_MESSAGE: + case PRODUCE_TO_KAFKA: + // TODO: PRODUCE_TO_KAFKA is an unused enum at this stage, we could delete it. Or, use it in a future + // refactor. + // It might be cleaner to treat processing of each kafka message as a state machine (we've previously + // discussed, + // leveraging the actor pattern, and this would be along the same lines). We would iterate until we get to a + // terminal state. + // At the moment, delegateConsumerRecord bundles together quite a lot of steps, and that seems to complicate + // the code quite a bit + case END_PROCESSING: + // Nothing left to do for this message break; default: throw new VeniceException( @@ -1771,8 +1783,12 @@ protected void processCommonConsumerAction(ConsumerAction consumerAction) throws OffsetRecord offsetRecord = storageMetadataService.getLastOffset(topic, partition); // Let's try to restore the state retrieved from the OffsetManager - PartitionConsumptionState newPartitionConsumptionState = - new PartitionConsumptionState(partition, amplificationFactor, offsetRecord, hybridStoreConfig.isPresent()); + PartitionConsumptionState newPartitionConsumptionState = new PartitionConsumptionState( + partition, + amplificationFactor, + offsetRecord, + hybridStoreConfig.isPresent(), + this.runInThreadSafeMode); newPartitionConsumptionState.setLeaderFollowerState(leaderState); partitionConsumptionStateMap.put(partition, newPartitionConsumptionState); @@ -1926,7 +1942,8 @@ private void resetOffset(int partition, PubSubTopicPartition topicPartition, boo partition, amplificationFactor, new OffsetRecord(partitionStateSerializer), - hybridStoreConfig.isPresent())); + hybridStoreConfig.isPresent(), + this.runInThreadSafeMode)); storageUtilizationManager.initPartition(partition); // Reset the error partition tracking partitionIngestionExceptionList.set(partition, null); @@ -2145,6 +2162,22 @@ public void processConsumerRecord( int subPartition, String kafkaUrl, long beforeProcessingRecordTimestampNs) { + processConsumerRecord( + record, + leaderProducedRecordContext, + subPartition, + kafkaUrl, + beforeProcessingRecordTimestampNs, + true); + } + + public void processConsumerRecord( + PubSubMessage record, + LeaderProducedRecordContext leaderProducedRecordContext, + int subPartition, + String kafkaUrl, + long beforeProcessingRecordTimestampNs, + boolean maybeSyncOffset) { // The partitionConsumptionStateMap can be modified by other threads during consumption (for example when // unsubscribing) // in order to maintain thread safety, we hold onto the reference to the partitionConsumptionState and pass that @@ -2229,9 +2262,28 @@ public void processConsumerRecord( && (partitionConsumptionState.getProcessedRecordSizeSinceLastSync() >= syncBytesInterval)); defaultReadyToServeChecker.apply(partitionConsumptionState, recordsProcessedAboveSyncIntervalThreshold); + if (maybeSyncOffset) { + maybeSyncOffsets(record, leaderProducedRecordContext, partitionConsumptionState, subPartition); + } + } + + /** + * Syncing offset checking in syncOffset() should be the very last step for processing a record. + */ + public void maybeSyncOffsets( + PubSubMessage record, + LeaderProducedRecordContext leaderProducedRecordContext, + PartitionConsumptionState partitionConsumptionState, + int subPartition) { + + long syncBytesInterval = partitionConsumptionState.isDeferredWrite() + ? databaseSyncBytesIntervalForDeferredWriteMode + : databaseSyncBytesIntervalForTransactionalMode; + boolean recordsProcessedAboveSyncIntervalThreshold = (syncBytesInterval > 0 + && (partitionConsumptionState.getProcessedRecordSizeSinceLastSync() >= syncBytesInterval)); + defaultReadyToServeChecker.apply(partitionConsumptionState, recordsProcessedAboveSyncIntervalThreshold); + /** - * Syncing offset checking in syncOffset() should be the very last step for processing a record. - * * Check whether offset metadata checkpoint will happen; if so, update the producer states recorded in OffsetRecord * with the updated producer states maintained in {@link #kafkaDataIntegrityValidator} */ @@ -3304,12 +3356,6 @@ private void waitReadyToProcessRecord(PubSubMessage oldValueAndRmd = @@ -318,7 +316,12 @@ private MergeConflictResult mergePutWithFieldLevelTimestamp( + storeName); } final GenericRecord oldValueFieldTimestampsRecord = (GenericRecord) oldTimestampObject; - if (ignoreNewPut(oldValueSchemaID, oldValueFieldTimestampsRecord, newValueSchemaID, putOperationTimestamp)) { + if (ignoreNewPut( + oldValueSchemaID, + oldValueFieldTimestampsRecord, + newValueSchemaID, + putOperationTimestamp, + newValueColoID)) { return MergeConflictResult.getIgnoredResult(); } final SchemaEntry mergeResultValueSchemaEntry = @@ -385,7 +388,7 @@ private MergeConflictResult mergeDeleteWithFieldLevelTimestamp( long deleteOperationTimestamp, long deleteOperationSourceOffset, int deleteOperationSourceBrokerID) { - if (ignoreNewDelete(oldValueFieldTimestampsRecord, deleteOperationTimestamp)) { + if (ignoreNewDelete(oldValueFieldTimestampsRecord, deleteOperationTimestamp, deleteOperationColoID)) { return MergeConflictResult.getIgnoredResult(); } // In this case, the writer and reader schemas are the same because deletion does not introduce any new schema. @@ -469,13 +472,18 @@ private boolean ignoreNewPut( final int oldValueSchemaID, GenericRecord oldValueFieldTimestampsRecord, final int newValueSchemaID, - final long putOperationTimestamp) { + final long putOperationTimestamp, + final int newValueColoId) { final Schema oldValueSchema = getValueSchema(oldValueSchemaID); List oldValueFields = oldValueSchema.getFields(); if (oldValueSchemaID == newValueSchemaID) { for (Schema.Field field: oldValueFields) { - if (isRmdFieldTimestampSmaller(oldValueFieldTimestampsRecord, field.name(), putOperationTimestamp, false)) { + if (isRmdFieldTimestampSmaller( + oldValueFieldTimestampsRecord, + field.name(), + putOperationTimestamp, + newValueColoId)) { return false; } } @@ -491,7 +499,11 @@ private boolean ignoreNewPut( if (oldFieldNames.containsAll(newFieldNames)) { // New value fields set is a subset of existing/old value fields set. for (String newFieldName: newFieldNames) { - if (isRmdFieldTimestampSmaller(oldValueFieldTimestampsRecord, newFieldName, putOperationTimestamp, false)) { + if (isRmdFieldTimestampSmaller( + oldValueFieldTimestampsRecord, + newFieldName, + putOperationTimestamp, + newValueColoId)) { return false; } } @@ -506,9 +518,16 @@ private boolean ignoreNewPut( } } - private boolean ignoreNewDelete(GenericRecord oldValueFieldTimestampsRecord, final long deleteOperationTimestamp) { + private boolean ignoreNewDelete( + GenericRecord oldValueFieldTimestampsRecord, + final long deleteOperationTimestamp, + int deleteOperationColoID) { for (Schema.Field field: oldValueFieldTimestampsRecord.getSchema().getFields()) { - if (isRmdFieldTimestampSmaller(oldValueFieldTimestampsRecord, field.name(), deleteOperationTimestamp, false)) { + if (isRmdFieldTimestampSmaller( + oldValueFieldTimestampsRecord, + field.name(), + deleteOperationTimestamp, + deleteOperationColoID)) { return false; } } @@ -527,19 +546,27 @@ private boolean isRmdFieldTimestampSmaller( GenericRecord oldValueFieldTimestampsRecord, String fieldName, final long newTimestamp, - final boolean strictlySmaller) { + final int newValueColoId) { final Object fieldTimestampObj = oldValueFieldTimestampsRecord.get(fieldName); final long oldFieldTimestamp; if (fieldTimestampObj instanceof Long) { oldFieldTimestamp = (Long) fieldTimestampObj; } else if (fieldTimestampObj instanceof GenericRecord) { - oldFieldTimestamp = (Long) ((GenericRecord) fieldTimestampObj).get(TOP_LEVEL_TS_FIELD_POS); + oldFieldTimestamp = (Long) ((GenericRecord) fieldTimestampObj).get(CollectionRmdTimestamp.TOP_LEVEL_TS_FIELD_POS); } else { throw new VeniceException( "Replication metadata field timestamp is expected to be either a long or a GenericRecord. " + "Got: " + fieldTimestampObj); } - return strictlySmaller ? (oldFieldTimestamp < newTimestamp) : (oldFieldTimestamp <= newTimestamp); + if (oldFieldTimestamp == newTimestamp) { + // break ties based on coloId if it's available + if (fieldTimestampObj instanceof GenericRecord) { + int oldColoId = + (int) ((GenericRecord) fieldTimestampObj).get(CollectionRmdTimestamp.TOP_LEVEL_COLO_ID_FIELD_NAME); + return oldColoId <= newValueColoId; + } + } + return (oldFieldTimestamp <= newTimestamp); } private MergeConflictResult putWithoutRmd( @@ -684,11 +711,12 @@ protected GenericRecord createPerFieldTimestampRecord( case RECORD: GenericRecord collectionFieldTimestampRecord = AvroSchemaUtils.createGenericRecord(field.schema()); // Only need to set the top-level field timestamp on collection timestamp record. - collectionFieldTimestampRecord.put(TOP_LEVEL_TS_FIELD_POS, fieldTimestamp); + collectionFieldTimestampRecord.put(CollectionRmdTimestamp.TOP_LEVEL_TS_FIELD_POS, fieldTimestamp); // When a collection field metadata is created, its top-level colo ID is always -1. - collectionFieldTimestampRecord.put(TOP_LEVEL_COLO_ID_FIELD_POS, -1); - collectionFieldTimestampRecord - .put(PUT_ONLY_PART_LENGTH_FIELD_POS, getCollectionFieldLen(oldValueRecord, field.name())); + collectionFieldTimestampRecord.put(CollectionRmdTimestamp.TOP_LEVEL_COLO_ID_FIELD_POS, -1); + collectionFieldTimestampRecord.put( + CollectionRmdTimestamp.PUT_ONLY_PART_LENGTH_FIELD_POS, + getCollectionFieldLen(oldValueRecord, field.name())); perFieldTimestampRecord.put(field.pos(), collectionFieldTimestampRecord); continue; @@ -722,7 +750,8 @@ private int getCollectionFieldLen(GenericRecord valueRecord, String collectionFi private boolean ignoreNewUpdate( final long updateOperationTimestamp, GenericRecord writeComputeRecord, - RmdWithValueSchemaId rmdWithValueSchemaId) { + RmdWithValueSchemaId rmdWithValueSchemaId, + final int newValueColoID) { if (rmdWithValueSchemaId == null) { return false; } @@ -757,7 +786,7 @@ private boolean ignoreNewUpdate( && timestampRecord.get(field.name()) == null) { return false; // Write Compute tries to update a non-existing field. } - if (isRmdFieldTimestampSmaller(timestampRecord, field.name(), updateOperationTimestamp, false)) { + if (isRmdFieldTimestampSmaller(timestampRecord, field.name(), updateOperationTimestamp, newValueColoID)) { return false; // One existing field must be updated. } } diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/schema/merge/PerFieldTimestampMergeRecordHelper.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/schema/merge/PerFieldTimestampMergeRecordHelper.java index ac2091e265..f66c45730f 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/schema/merge/PerFieldTimestampMergeRecordHelper.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/schema/merge/PerFieldTimestampMergeRecordHelper.java @@ -27,14 +27,11 @@ public UpdateResultStatus putOnField( } else if (oldTimestamp == newPutTimestamp) { Object oldFieldValue = oldRecord.get(oldRecordField.pos()); newFieldValue = compareAndReturn(oldFieldValue, newFieldValue, oldRecordField.schema()); - final boolean newFieldCompletelyReplaceOldField = newFieldValue != oldFieldValue; - if (newFieldCompletelyReplaceOldField) { + + if (newFieldValue != oldFieldValue) { oldRecord.put(oldRecordField.pos(), newFieldValue); } - return newFieldCompletelyReplaceOldField - ? UpdateResultStatus.COMPLETELY_UPDATED - : UpdateResultStatus.NOT_UPDATED_AT_ALL; - + return UpdateResultStatus.COMPLETELY_UPDATED; } else { // New field value wins. oldRecord.put(oldRecordField.pos(), newFieldValue); diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/schema/merge/SortBasedCollectionFieldOpHandler.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/schema/merge/SortBasedCollectionFieldOpHandler.java index 1c1674d867..563bedeaa4 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/schema/merge/SortBasedCollectionFieldOpHandler.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/schema/merge/SortBasedCollectionFieldOpHandler.java @@ -773,7 +773,7 @@ private UpdateResultStatus handleModifyPutOnlyMap( deletedKeys.sort(String::compareTo); // Step 3: Set new active elements and their active timestamps. - setNewMapActiveElementAndTs( + setNewMapActiveElementAndTimestamp( putOnlyPartMap, collectionMergePartKeys, newEntries, @@ -789,7 +789,7 @@ private UpdateResultStatus handleModifyPutOnlyMap( return UpdateResultStatus.PARTIALLY_UPDATED; } - private void setNewMapActiveElementAndTs( + private void setNewMapActiveElementAndTimestamp( final IndexedHashMap putOnlyPartMap, final List collectionMergePartKeys, Map newEntries, @@ -843,7 +843,7 @@ private UpdateResultStatus handleModifyCollectionMergeMap( currMap.forEach((key, value) -> currKeyValPairs.add(new KeyValPair(key, value))); final List activeTimestamps = collectionFieldRmd.getActiveElementTimestamps(); - final IndexedHashMap activeEntriesToTsMap = Utils.createElementToActiveTsMap( + final IndexedHashMap activeEntriesToTimestampMap = Utils.createElementToActiveTsMap( currKeyValPairs, activeTimestamps, collectionFieldRmd.getTopLevelFieldTimestamp(), @@ -852,7 +852,7 @@ private UpdateResultStatus handleModifyCollectionMergeMap( final List deletedKeys = collectionFieldRmd.getDeletedElements(); final List deletedTimestamps = collectionFieldRmd.getDeletedElementTimestamps(); - final IndexedHashMap deletedKeyToTsMap = + final IndexedHashMap deletedKeyToTimestampMap = Utils.createDeletedElementToTsMap(deletedKeys, deletedTimestamps, Long.MIN_VALUE); boolean updated = false; @@ -862,23 +862,23 @@ private UpdateResultStatus handleModifyCollectionMergeMap( // Step 1: Add elements (MAP_UNION). for (Map.Entry newEntry: newEntries.entrySet()) { final String newKey = newEntry.getKey(); - final Long deletedTimestamp = deletedKeyToTsMap.get(newKey); + final Long deletedTimestamp = deletedKeyToTimestampMap.get(newKey); if (deletedTimestamp != null) { // Key was deleted before. if (deletedTimestamp < modifyTimestamp) { // k-v entry will be added back. - deletedKeyToTsMap.remove(newKey); - activeEntriesToTsMap.put(new KeyValPair(newKey, newEntry.getValue()), modifyTimestamp); + deletedKeyToTimestampMap.remove(newKey); + activeEntriesToTimestampMap.put(new KeyValPair(newKey, newEntry.getValue()), modifyTimestamp); updated = true; } // Else: Key remains "deleted". } else { // Key was not deleted before. KeyValPair newKeyValue = new KeyValPair(newKey, newEntry.getValue()); - final Long activeTimestamp = activeEntriesToTsMap.get(newKeyValue); + final Long activeTimestamp = activeEntriesToTimestampMap.get(newKeyValue); if (activeTimestamp == null) { // The key does not exist before. - activeEntriesToTsMap.put(newKeyValue, modifyTimestamp); + activeEntriesToTimestampMap.put(newKeyValue, modifyTimestamp); updated = true; } else { // The key exist. @@ -886,17 +886,17 @@ private UpdateResultStatus handleModifyCollectionMergeMap( newPutOnlyPartLength--; } if (activeTimestamp < modifyTimestamp) { - activeEntriesToTsMap.remove(newKeyValue); - activeEntriesToTsMap.put(newKeyValue, modifyTimestamp); + activeEntriesToTimestampMap.remove(newKeyValue); + activeEntriesToTimestampMap.put(newKeyValue, modifyTimestamp); updated = true; } else if (activeTimestamp == modifyTimestamp) { // Note that if the current active timestamp is equal to the modify timestamp, we compare value. Object currentValue = currMap.get(newKey); Object newValue = newKeyValue.getVal(); - if (shouldUpdateMapFieldItemValueWithSameTs(currentValue, newValue, currValueRecordField.schema())) { - activeEntriesToTsMap.remove(newKeyValue); - activeEntriesToTsMap.put(newKeyValue, modifyTimestamp); + if (shouldUpdateMapFieldItemValueWithSameTimestamp(currentValue, newValue, currValueRecordField.schema())) { + activeEntriesToTimestampMap.remove(newKeyValue); + activeEntriesToTimestampMap.put(newKeyValue, modifyTimestamp); updated = true; } } @@ -906,32 +906,32 @@ private UpdateResultStatus handleModifyCollectionMergeMap( // Step 2: Remove elements (MAP_DIFF). for (String toRemoveKey: toRemoveKeys) { - final Long deletedTimestamp = deletedKeyToTsMap.get(toRemoveKey); + final Long deletedTimestamp = deletedKeyToTimestampMap.get(toRemoveKey); if (deletedTimestamp != null) { // This key was deleted before. if (deletedTimestamp < modifyTimestamp) { // Update the deleted timestamp of this key. - deletedKeyToTsMap.put(toRemoveKey, modifyTimestamp); + deletedKeyToTimestampMap.put(toRemoveKey, modifyTimestamp); updated = true; } } else { // This key was not deleted before and now it is deleted. final KeyValPair toRemove = new KeyValPair(toRemoveKey); - final Long activeTimestamp = activeEntriesToTsMap.get(toRemove); + final Long activeTimestamp = activeEntriesToTimestampMap.get(toRemove); if (activeTimestamp != null) { if (activeTimestamp <= modifyTimestamp) { // Delete an existing k-v entry. - activeEntriesToTsMap.remove(toRemove); + activeEntriesToTimestampMap.remove(toRemove); if (activeTimestamp == topLevelTimestamp) { // Delete a k-v pair from the put-only part. newPutOnlyPartLength--; } - deletedKeyToTsMap.put(toRemoveKey, modifyTimestamp); + deletedKeyToTimestampMap.put(toRemoveKey, modifyTimestamp); updated = true; } // Else: existing k-v entry does not get deleted. } else { // Key never existed and it should be marked as deleted now. - deletedKeyToTsMap.put(toRemoveKey, modifyTimestamp); + deletedKeyToTimestampMap.put(toRemoveKey, modifyTimestamp); updated = true; } } @@ -942,14 +942,14 @@ private UpdateResultStatus handleModifyCollectionMergeMap( } // Step 3: Set new active map entries and their active timestamps. - final List newActiveEntriesAndTsList = new ArrayList<>(activeEntriesToTsMap.size()); - activeEntriesToTsMap.forEach((activeEntry, activeTs) -> { + final List newActiveEntriesAndTsList = new ArrayList<>(activeEntriesToTimestampMap.size()); + activeEntriesToTimestampMap.forEach((activeEntry, activeTs) -> { newActiveEntriesAndTsList.add(new ElementAndTimestamp(activeEntry, activeTs)); }); sortElementAndTimestampListInMap( newActiveEntriesAndTsList.subList(newPutOnlyPartLength, newActiveEntriesAndTsList.size()), x -> ((KeyValPair) x).getKey()); - setNewMapActiveElementAndTs( + setNewMapActiveElementAndTimestamp( newActiveEntriesAndTsList, newPutOnlyPartLength, currValueRecord, @@ -957,13 +957,13 @@ private UpdateResultStatus handleModifyCollectionMergeMap( collectionFieldRmd); // Step 4: Set new deleted keys and their deleted timestamps. - final List newDeletedKeyAndTsList = new ArrayList<>(deletedKeyToTsMap.size()); + final List newDeletedKeyAndTimestampList = new ArrayList<>(deletedKeyToTimestampMap.size()); - deletedKeyToTsMap.forEach((k, v) -> newDeletedKeyAndTsList.add(new ElementAndTimestamp(k, v))); + deletedKeyToTimestampMap.forEach((k, v) -> newDeletedKeyAndTimestampList.add(new ElementAndTimestamp(k, v))); // The element here is String (as deleted key). So, we can use a String comparator. - sortElementAndTimestampListInMap(newDeletedKeyAndTsList, x -> (String) x); - setDeletedDeletedKeyAndTsList(newDeletedKeyAndTsList, collectionFieldRmd); + sortElementAndTimestampListInMap(newDeletedKeyAndTimestampList, x -> (String) x); + setDeletedDeletedKeyAndTimestampList(newDeletedKeyAndTimestampList, collectionFieldRmd); return UpdateResultStatus.PARTIALLY_UPDATED; } @@ -997,7 +997,7 @@ private Schema getSchemaFromNullableCollectionSchema(Schema nullableSchema, Sche return schema; } - private void setNewMapActiveElementAndTs( + private void setNewMapActiveElementAndTimestamp( List activeElementAndTsList, final int newPutOnlyPartLength, GenericRecord currValueRecord, @@ -1007,11 +1007,11 @@ private void setNewMapActiveElementAndTs( PrimitiveLongList newActiveTimestamps = new PrimitiveLongArrayList(activeElementAndTsList.size() - newPutOnlyPartLength); int idx = 0; - for (ElementAndTimestamp activeEntryAndTs: activeElementAndTsList) { - KeyValPair activeEntry = (KeyValPair) activeEntryAndTs.getElement(); + for (ElementAndTimestamp activeEntryAndTimestamp: activeElementAndTsList) { + KeyValPair activeEntry = (KeyValPair) activeEntryAndTimestamp.getElement(); newMap.put(activeEntry.getKey(), activeEntry.getVal()); if (idx >= newPutOnlyPartLength) { - newActiveTimestamps.addPrimitive(activeEntryAndTs.getTimestamp()); + newActiveTimestamps.addPrimitive(activeEntryAndTimestamp.getTimestamp()); } idx++; } @@ -1020,7 +1020,7 @@ private void setNewMapActiveElementAndTs( collectionFieldRmd.setPutOnlyPartLength(newPutOnlyPartLength); } - private void setDeletedDeletedKeyAndTsList( + private void setDeletedDeletedKeyAndTimestampList( List deletedElementAndTsList, CollectionRmdTimestamp collectionFieldRmd) { List deletedKeys = new ArrayList<>(deletedElementAndTsList.size()); @@ -1047,7 +1047,10 @@ private boolean ignoreIncomingRequest( return false; } - private boolean shouldUpdateMapFieldItemValueWithSameTs(Object currentValue, Object newValue, Schema fieldSchema) { + private boolean shouldUpdateMapFieldItemValueWithSameTimestamp( + Object currentValue, + Object newValue, + Schema fieldSchema) { /** * For complex map item value type, for example union type [null, item value type], it is possible that the item * value can be null. This is the safeguard to not compare with the null value and always let the not-null value win @@ -1073,6 +1076,6 @@ private boolean shouldUpdateMapFieldItemValueWithSameTs(Object currentValue, Obj if (mapValueSchema == null) { throw new VeniceException("Could not find map schema in map field: " + fieldSchema.toString(true)); } - return AvroCollectionElementComparator.INSTANCE.compare(newValue, currentValue, mapValueSchema) > 0; + return AvroCollectionElementComparator.INSTANCE.compare(newValue, currentValue, mapValueSchema) >= 0; } } diff --git a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/ActiveActiveStoreIngestionTaskTest.java b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/ActiveActiveStoreIngestionTaskTest.java index abc7fa5058..4fbb119091 100644 --- a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/ActiveActiveStoreIngestionTaskTest.java +++ b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/ActiveActiveStoreIngestionTaskTest.java @@ -258,7 +258,7 @@ public void testLeaderCanSendValueChunksIntoDrainer() when(ingestionTask.getVersionIngestionStats()).thenReturn(mock(AggVersionedIngestionStats.class)); when(ingestionTask.getVersionedDIVStats()).thenReturn(mock(AggVersionedDIVStats.class)); when(ingestionTask.getKafkaVersionTopic()).thenReturn(testTopic); - when(ingestionTask.createProducerCallback(any(), any(), any(), anyInt(), anyString(), anyLong())) + when(ingestionTask.createProducerCallback(any(), any(), any(), anyInt(), anyString(), anyLong(), anyBoolean())) .thenCallRealMethod(); when(ingestionTask.getProduceToTopicFunction(any(), any(), any(), any(), any(), anyInt(), anyBoolean())) .thenCallRealMethod(); diff --git a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/LeaderProducerCallbackTest.java b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/LeaderProducerCallbackTest.java index b6761a6037..481f6f491a 100644 --- a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/LeaderProducerCallbackTest.java +++ b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/LeaderProducerCallbackTest.java @@ -73,7 +73,8 @@ public void testOnCompletionWithNonNullException() { leaderProducedRecordContextMock, subPartition, kafkaUrl, - beforeProcessingRecordTimestamp); + beforeProcessingRecordTimestamp, + false); int cbInvocations = 6; // (T1:1), (T2:21), (T1:1), (T2:22), (T1:1), (T2:22) String exMessage = "Producer is closed forcefully"; @@ -116,7 +117,8 @@ public void testLeaderProducerCallbackProduceDeprecatedChunkDeletion() throws In leaderProducedRecordContext, 0, "url", - 0); + 0, + false); ChunkedValueManifest manifest = new ChunkedValueManifest(); manifest.keysWithChunkIdSuffix = new ArrayList<>(); diff --git a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/PartitionConsumptionStateTest.java b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/PartitionConsumptionStateTest.java index 25cd0bdd89..41babadd55 100644 --- a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/PartitionConsumptionStateTest.java +++ b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/PartitionConsumptionStateTest.java @@ -24,7 +24,7 @@ public class PartitionConsumptionStateTest { @Test public void testUpdateChecksum() { - PartitionConsumptionState pcs = new PartitionConsumptionState(0, 1, mock(OffsetRecord.class), false); + PartitionConsumptionState pcs = new PartitionConsumptionState(0, 1, mock(OffsetRecord.class), false, false); pcs.initializeExpectedChecksum(); byte[] rmdPayload = new byte[] { 127 }; byte[] key1 = new byte[] { 1 }; @@ -79,7 +79,7 @@ public void testUpdateChecksum() { */ @Test public void testTransientRecordMap() { - PartitionConsumptionState pcs = new PartitionConsumptionState(0, 1, mock(OffsetRecord.class), false); + PartitionConsumptionState pcs = new PartitionConsumptionState(0, 1, mock(OffsetRecord.class), false, false); byte[] key1 = new byte[] { 65, 66, 67, 68 }; byte[] key2 = new byte[] { 65, 66, 67, 68 }; @@ -126,7 +126,7 @@ public void testTransientRecordMap() { @Test public void testIsLeaderCompleted() { - PartitionConsumptionState pcs = new PartitionConsumptionState(0, 1, mock(OffsetRecord.class), false); + PartitionConsumptionState pcs = new PartitionConsumptionState(0, 1, mock(OffsetRecord.class), false, false); // default is LEADER_COMPLETE_STATE_UNKNOWN assertEquals(pcs.getLeaderCompleteState(), LeaderCompleteState.LEADER_COMPLETE_STATE_UNKNOWN); assertFalse(pcs.isLeaderCompleted()); diff --git a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/StorageUtilizationManagerTest.java b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/StorageUtilizationManagerTest.java index 7cc6c01455..b9d814bd37 100644 --- a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/StorageUtilizationManagerTest.java +++ b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/StorageUtilizationManagerTest.java @@ -51,7 +51,7 @@ public void buildNewQuotaEnforcer() { partitionConsumptionStateMap = new VeniceConcurrentHashMap<>(); for (int i = 1; i <= storePartitionCount; i++) { - PartitionConsumptionState pcs = new PartitionConsumptionState(i, 1, mock(OffsetRecord.class), true); + PartitionConsumptionState pcs = new PartitionConsumptionState(i, 1, mock(OffsetRecord.class), true, false); partitionConsumptionStateMap.put(i, pcs); } diff --git a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTaskTest.java b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTaskTest.java index 905e14152b..9aa6809fcc 100644 --- a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTaskTest.java +++ b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTaskTest.java @@ -3389,7 +3389,7 @@ public void testGetAndUpdateLeaderCompletedState( OffsetRecord mockOffsetRecord = mock(OffsetRecord.class); PartitionConsumptionState partitionConsumptionState = - new PartitionConsumptionState(PARTITION_FOO, amplificationFactor, mockOffsetRecord, true); + new PartitionConsumptionState(PARTITION_FOO, amplificationFactor, mockOffsetRecord, true, false); long producerTimestamp = System.currentTimeMillis(); LeaderMetadataWrapper mockLeaderMetadataWrapper = mock(LeaderMetadataWrapper.class); @@ -3682,7 +3682,8 @@ public void testLeaderShouldSubscribeToCorrectVTOffset() { OffsetRecord offsetRecord = mock(OffsetRecord.class); doReturn(pubSubTopicRepository.getTopic(versionTopicName)).when(offsetRecord).getLeaderTopic(any()); - PartitionConsumptionState partitionConsumptionState = new PartitionConsumptionState(0, 1, offsetRecord, false); + PartitionConsumptionState partitionConsumptionState = + new PartitionConsumptionState(0, 1, offsetRecord, false, false); long localVersionTopicOffset = 100L; long remoteVersionTopicOffset = 200L; @@ -4024,7 +4025,8 @@ public void testBatchOnlyStoreDataRecovery() { OffsetRecord offsetRecord = mock(OffsetRecord.class); doReturn(pubSubTopic).when(offsetRecord).getLeaderTopic(any()); - PartitionConsumptionState partitionConsumptionState = new PartitionConsumptionState(0, 1, offsetRecord, false); + PartitionConsumptionState partitionConsumptionState = + new PartitionConsumptionState(0, 1, offsetRecord, false, false); storeIngestionTaskUnderTest.updateLeaderTopicOnFollower(partitionConsumptionState); storeIngestionTaskUnderTest.startConsumingAsLeader(partitionConsumptionState); diff --git a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/replication/merge/TestMergePut.java b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/replication/merge/TestMergePut.java index b9d465312d..dc26ce6125 100644 --- a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/replication/merge/TestMergePut.java +++ b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/replication/merge/TestMergePut.java @@ -113,7 +113,8 @@ public void testPutIgnored() { 0); Assert.assertTrue(result.isUpdateIgnored()); - // Case2: Apply PUT operation with same TS but less colo ID + // Case2: Apply PUT operation with same TS but less colo ID. The result of this should be a non updated + // record, but the result is NOT ignored. result = mergeConflictResolver.put( Lazy.of(() -> serializeValueRecord(oldValueRecord)), new RmdWithValueSchemaId(schemaSet.getValueSchemaId(), RMD_VERSION_ID, oldRmdRecord), @@ -123,7 +124,9 @@ public void testPutIgnored() { 1L, 0, -2); - Assert.assertTrue(result.isUpdateIgnored()); + GenericRecord foo = deserializeValueRecord(result.getNewValue()); + Assert.assertFalse(result.isUpdateIgnored()); + Assert.assertEquals(foo, oldValueRecord); } /** diff --git a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/replication/merge/TestMergeUpdate.java b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/replication/merge/TestMergeUpdate.java index 3066f4b5a6..f9f70ecb83 100644 --- a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/replication/merge/TestMergeUpdate.java +++ b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/replication/merge/TestMergeUpdate.java @@ -57,7 +57,12 @@ public void testRegularFieldUpdateIgnored(boolean rmdStartsWithFieldLevelTs, boo 1L, 0, 0); - Assert.assertTrue(result.isUpdateIgnored()); + if (isUpdateTsSmallerThanRmdTs) { + Assert.assertTrue(result.isUpdateIgnored()); + } else { + Assert.assertEquals(deserializeValueRecord(result.getNewValue()), oldValueRecord); + Assert.assertFalse(result.isUpdateIgnored()); + } } /** diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/ConfigKeys.java b/internal/venice-common/src/main/java/com/linkedin/venice/ConfigKeys.java index 466a2906a3..873186e2ab 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/ConfigKeys.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/ConfigKeys.java @@ -421,6 +421,8 @@ private ConfigKeys() { "store.writer.buffer.after.leader.logic.enabled"; public static final String SERVER_INGESTION_TASK_MAX_IDLE_COUNT = "server.ingestion.task.max.idle.count"; + + public static final String SERVER_INGESTION_TASK_THREAD_SAFE_MODE = "server.ingestion.thread.safe.mode"; public static final String STORE_WRITER_BUFFER_MEMORY_CAPACITY = "store.writer.buffer.memory.capacity"; public static final String STORE_WRITER_BUFFER_NOTIFY_DELTA = "store.writer.buffer.notify.delta"; public static final String SERVER_REST_SERVICE_STORAGE_THREAD_NUM = "server.rest.service.storage.thread.num"; diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestActiveActiveIngestion.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestActiveActiveIngestion.java index 88da782bbe..7ce1448824 100644 --- a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestActiveActiveIngestion.java +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestActiveActiveIngestion.java @@ -126,6 +126,7 @@ public void setUp() { Properties serverProperties = new Properties(); serverProperties.setProperty(ConfigKeys.SERVER_PROMOTION_TO_LEADER_REPLICA_DELAY_SECONDS, Long.toString(1)); serverProperties.put(ROCKSDB_PLAIN_TABLE_FORMAT_ENABLED, false); + serverProperties.put(SERVER_INGESTION_TASK_THREAD_SAFE_MODE, true); serverProperties.put( CHILD_DATA_CENTER_KAFKA_URL_PREFIX + "." + DEFAULT_PARENT_DATA_CENTER_REGION_NAME, "localhost:" + TestUtils.getFreePort()); diff --git a/services/venice-server/src/test/java/com/linkedin/venice/listener/StorageReadRequestHandlerTest.java b/services/venice-server/src/test/java/com/linkedin/venice/listener/StorageReadRequestHandlerTest.java index 28bb1cab7d..71965046e7 100644 --- a/services/venice-server/src/test/java/com/linkedin/venice/listener/StorageReadRequestHandlerTest.java +++ b/services/venice-server/src/test/java/com/linkedin/venice/listener/StorageReadRequestHandlerTest.java @@ -377,6 +377,7 @@ public void testAdminRequestsPassInStorageExecutionHandler() throws Exception { expectedPartitionId, 1, new OffsetRecord(AvroProtocolDefinition.PARTITION_STATE.getSerializer()), + false, false); expectedAdminResponse.addPartitionConsumptionState(state); doReturn(expectedAdminResponse).when(metadataRetriever).getConsumptionSnapshots(eq(topic), any()); From 88adca9f2724f8b2e9249475a837010fd723cd6a Mon Sep 17 00:00:00 2001 From: Zac Policzer Date: Wed, 10 Apr 2024 17:14:41 -0700 Subject: [PATCH 2/5] address comments --- .../com/linkedin/venice/endToEnd/TestActiveActiveIngestion.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestActiveActiveIngestion.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestActiveActiveIngestion.java index df0f996283..959454287e 100644 --- a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestActiveActiveIngestion.java +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestActiveActiveIngestion.java @@ -127,7 +127,7 @@ public void setUp() { Properties serverProperties = new Properties(); serverProperties.setProperty(ConfigKeys.SERVER_PROMOTION_TO_LEADER_REPLICA_DELAY_SECONDS, Long.toString(1)); serverProperties.put(ROCKSDB_PLAIN_TABLE_FORMAT_ENABLED, false); - serverProperties.put(SERVER_INGESTION_TASK_THREAD_SAFE_MODE, true); + serverProperties.put(SERVER_INGESTION_TASK_THREAD_SAFE_MODE, false); serverProperties.put( CHILD_DATA_CENTER_KAFKA_URL_PREFIX + "." + DEFAULT_PARENT_DATA_CENTER_REGION_NAME, "localhost:" + TestUtils.getFreePort()); From 593cd0fe11c3148feee21d5f80aef41fc90570c9 Mon Sep 17 00:00:00 2001 From: Zac Policzer Date: Wed, 24 Apr 2024 18:10:12 -0700 Subject: [PATCH 3/5] add chunking fix and set AAIngestion test to use thread safe mode --- .../davinci/kafka/consumer/StoreIngestionTask.java | 11 +++++++++++ .../venice/endToEnd/TestActiveActiveIngestion.java | 6 +++--- 2 files changed, 14 insertions(+), 3 deletions(-) diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java index 3e7d23a359..d1a7502ca2 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java @@ -82,6 +82,7 @@ import com.linkedin.venice.pubsub.manager.TopicManager; import com.linkedin.venice.pubsub.manager.TopicManagerRepository; import com.linkedin.venice.schema.SchemaEntry; +import com.linkedin.venice.serialization.KeyWithChunkingSuffixSerializer; import com.linkedin.venice.serialization.avro.AvroProtocolDefinition; import com.linkedin.venice.serialization.avro.InternalAvroSpecificSerializer; import com.linkedin.venice.serializer.AvroGenericDeserializer; @@ -324,6 +325,8 @@ public abstract class StoreIngestionTask implements Runnable, Closeable { private final String[] msgForLagMeasurement; private final Runnable runnableForKillIngestionTasksForNonCurrentVersions; + private final KeyWithChunkingSuffixSerializer keyWithChunkingSuffixSerializer = new KeyWithChunkingSuffixSerializer(); + protected final boolean runInThreadSafeMode; public StoreIngestionTask( @@ -2879,6 +2882,14 @@ private int internalProcessConsumerRecord( LOGGER.error("Failed to record Record heartbeat with message: ", e); } } else { + // TODO: This is a hack. Today the code kind of does a backdoor change to a key in the leaderProducer callback. + // However, we need to persist before + // producing in thread safe mode. Keys which need to be + if (this.isChunked && this.runInThreadSafeMode && leaderProducedRecordContext != null + && consumerRecord.getTopicPartition().getPubSubTopic().isRealTime()) { + leaderProducedRecordContext + .setKeyBytes(keyWithChunkingSuffixSerializer.serializeNonChunkedKey(consumerRecord.getKey().getKey())); + } updateLatestInMemoryProcessedOffset( partitionConsumptionState, consumerRecord, diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestActiveActiveIngestion.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestActiveActiveIngestion.java index 959454287e..4e7453774b 100644 --- a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestActiveActiveIngestion.java +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestActiveActiveIngestion.java @@ -127,7 +127,7 @@ public void setUp() { Properties serverProperties = new Properties(); serverProperties.setProperty(ConfigKeys.SERVER_PROMOTION_TO_LEADER_REPLICA_DELAY_SECONDS, Long.toString(1)); serverProperties.put(ROCKSDB_PLAIN_TABLE_FORMAT_ENABLED, false); - serverProperties.put(SERVER_INGESTION_TASK_THREAD_SAFE_MODE, false); + serverProperties.put(SERVER_INGESTION_TASK_THREAD_SAFE_MODE, true); serverProperties.put( CHILD_DATA_CENTER_KAFKA_URL_PREFIX + "." + DEFAULT_PARENT_DATA_CENTER_REGION_NAME, "localhost:" + TestUtils.getFreePort()); @@ -410,10 +410,10 @@ public void testKIFRepushActiveActiveStore(boolean isChunkingEnabled) throws Exc ClientConfig.defaultGenericClientConfig(storeName) .setVeniceURL(clusterWrapper.getRandomRouterURL()) .setMetricsRepository(metricsRepository))) { - TestUtils.waitForNonDeterministicAssertion(30, TimeUnit.SECONDS, true, () -> { + TestUtils.waitForNonDeterministicAssertion(300, TimeUnit.SECONDS, true, () -> { Assert.assertNotNull(client.get(Integer.toString(deleteWithRmdKeyIndex + 1)).get()); }); - TestUtils.waitForNonDeterministicAssertion(30, TimeUnit.SECONDS, true, () -> { + TestUtils.waitForNonDeterministicAssertion(300, TimeUnit.SECONDS, true, () -> { Assert.assertNull(client.get(Integer.toString(deleteWithRmdKeyIndex)).get()); }); } From 8c0e60cf47163f7c381a5cd0fc07cc107c67d120 Mon Sep 17 00:00:00 2001 From: Zac Policzer Date: Wed, 24 Apr 2024 18:12:32 -0700 Subject: [PATCH 4/5] reduce timeouts --- .../linkedin/venice/endToEnd/TestActiveActiveIngestion.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestActiveActiveIngestion.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestActiveActiveIngestion.java index 4e7453774b..df0f996283 100644 --- a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestActiveActiveIngestion.java +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestActiveActiveIngestion.java @@ -410,10 +410,10 @@ public void testKIFRepushActiveActiveStore(boolean isChunkingEnabled) throws Exc ClientConfig.defaultGenericClientConfig(storeName) .setVeniceURL(clusterWrapper.getRandomRouterURL()) .setMetricsRepository(metricsRepository))) { - TestUtils.waitForNonDeterministicAssertion(300, TimeUnit.SECONDS, true, () -> { + TestUtils.waitForNonDeterministicAssertion(30, TimeUnit.SECONDS, true, () -> { Assert.assertNotNull(client.get(Integer.toString(deleteWithRmdKeyIndex + 1)).get()); }); - TestUtils.waitForNonDeterministicAssertion(300, TimeUnit.SECONDS, true, () -> { + TestUtils.waitForNonDeterministicAssertion(30, TimeUnit.SECONDS, true, () -> { Assert.assertNull(client.get(Integer.toString(deleteWithRmdKeyIndex)).get()); }); } From 99451e1b93e810459511e7e15cd3bf54154748f1 Mon Sep 17 00:00:00 2001 From: Zac Policzer Date: Wed, 31 Jul 2024 15:32:11 -0700 Subject: [PATCH 5/5] last tweaks for chunk deprecation --- .../kafka/consumer/LeaderProducerCallback.java | 17 +++++++++++------ 1 file changed, 11 insertions(+), 6 deletions(-) diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/LeaderProducerCallback.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/LeaderProducerCallback.java index 83ea27daf8..bdf10a80be 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/LeaderProducerCallback.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/LeaderProducerCallback.java @@ -156,7 +156,8 @@ public void onCompletion(PubSubProduceResult produceResult, Exception e) { partition, kafkaUrl, beforeProcessingRecordTimestampNs, - currentTimeForMetricsMs); + currentTimeForMetricsMs, + false); producedRecordNum++; producedRecordSize = Math.max(0, produceResult.getSerializedSize()); @@ -194,7 +195,8 @@ public void onCompletion(PubSubProduceResult produceResult, Exception e) { partition, kafkaUrl, beforeProcessingRecordTimestampNs, - currentTimeForMetricsMs); + currentTimeForMetricsMs, + false); producedRecordNum++; producedRecordSize += key.length + manifest.remaining(); } @@ -321,7 +323,8 @@ private long produceChunksToStoreBufferService( partition, kafkaUrl, beforeProcessingRecordTimestampNs, - currentTimeForMetricsMs); + currentTimeForMetricsMs, + false); totalChunkSize += chunkKey.remaining() + chunkValue.remaining(); } return totalChunkSize; @@ -347,7 +350,8 @@ void produceDeprecatedChunkDeletionToStoreBufferService(ChunkedValueManifest man partition, kafkaUrl, beforeProcessingRecordTimestampNs, - currentTimeForMetricsMs); + currentTimeForMetricsMs, + true); } } @@ -357,8 +361,9 @@ protected void produceToStoreBufferService( int subPartition, String kafkaUrl, long beforeProcessingRecordTimestampNs, - long currentTimeForMetricsMs) throws InterruptedException { - if (this.syncOffsetsOnlyAfterProducing) { + long currentTimeForMetricsMs, + boolean chunkDeprecation) throws InterruptedException { + if (this.syncOffsetsOnlyAfterProducing && !chunkDeprecation) { // sync offsets ingestionTask .maybeSyncOffsets(consumedRecord, leaderProducedRecordContext, partitionConsumptionState, subPartition);