diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/ActiveActiveStoreIngestionTask.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/ActiveActiveStoreIngestionTask.java index c317c065f2..38c7d6bbb9 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/ActiveActiveStoreIngestionTask.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/ActiveActiveStoreIngestionTask.java @@ -164,7 +164,9 @@ public ActiveActiveStoreIngestionTask( null, this::processActiveActiveMessage, isWriteComputationEnabled, - isActiveActiveReplicationEnabled()); + isActiveActiveReplicationEnabled(), + aggVersionedIngestionStats, + getHostLevelIngestionStats()); }); } @@ -608,7 +610,6 @@ protected void processMessageAndMaybeProduceToKafka( } PubSubMessage consumerRecord = consumerRecordWrapper.getMessage(); KafkaKey kafkaKey = consumerRecord.getKey(); - // KafkaMessageEnvelope kafkaValue = consumerRecord.getValue(); byte[] keyBytes = kafkaKey.getKey(); final MergeConflictResultWrapper mergeConflictResultWrapper; if (consumerRecordWrapper.getProcessedResult() != null diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/IngestionBatchProcessor.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/IngestionBatchProcessor.java index 3e1a8202fd..299de9c0f0 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/IngestionBatchProcessor.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/IngestionBatchProcessor.java @@ -1,10 +1,14 @@ package com.linkedin.davinci.kafka.consumer; +import com.linkedin.davinci.stats.AggVersionedIngestionStats; +import com.linkedin.davinci.stats.HostLevelIngestionStats; import com.linkedin.davinci.utils.ByteArrayKey; import com.linkedin.venice.exceptions.VeniceException; import com.linkedin.venice.kafka.protocol.KafkaMessageEnvelope; import com.linkedin.venice.message.KafkaKey; +import com.linkedin.venice.meta.Version; import com.linkedin.venice.pubsub.api.PubSubMessage; +import com.linkedin.venice.utils.LatencyUtils; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; @@ -35,11 +39,15 @@ PubSubMessageProcessedResult apply( } private final String storeVersionName; + private final String storeName; + private final int version; private final ExecutorService batchProcessingThreadPool; private final KeyLevelLocksManager lockManager; private final boolean isWriteComputationEnabled; private final boolean isActiveActiveReplicationEnabled; private final ProcessingFunction processingFunction; + private final AggVersionedIngestionStats aggVersionedIngestionStats; + private final HostLevelIngestionStats hostLevelIngestionStats; public IngestionBatchProcessor( String storeVersionName, @@ -47,13 +55,20 @@ public IngestionBatchProcessor( KeyLevelLocksManager lockManager, ProcessingFunction processingFunction, boolean isWriteComputationEnabled, - boolean isActiveActiveReplicationEnabled) { + boolean isActiveActiveReplicationEnabled, + AggVersionedIngestionStats aggVersionedIngestionStats, + HostLevelIngestionStats hostLevelIngestionStats) { this.storeVersionName = storeVersionName; this.batchProcessingThreadPool = batchProcessingThreadPool; this.lockManager = lockManager; this.processingFunction = processingFunction; this.isWriteComputationEnabled = isWriteComputationEnabled; this.isActiveActiveReplicationEnabled = isActiveActiveReplicationEnabled; + this.aggVersionedIngestionStats = aggVersionedIngestionStats; + this.hostLevelIngestionStats = hostLevelIngestionStats; + + this.storeName = Version.parseStoreFromKafkaTopicName(storeVersionName); + this.version = Version.parseVersionFromKafkaTopicName(storeVersionName); } /** @@ -104,6 +119,7 @@ public List>> keyGroupMap = new HashMap<>(records.size()); - resultList.forEach(r -> { + + for (PubSubMessageProcessedResultWrapper r: resultList) { PubSubMessage message = r.getMessage(); if (!message.getKey().isControlMessage()) { ByteArrayKey byteArrayKey = ByteArrayKey.wrap(message.getKey().getKey()); keyGroupMap.computeIfAbsent(byteArrayKey, (ignored) -> new ArrayList<>()).add(r); + totalNumOfRecords++; } - }); + } + aggVersionedIngestionStats + .recordBatchProcessingRequest(storeName, version, totalNumOfRecords, System.currentTimeMillis()); + hostLevelIngestionStats.recordBatchProcessingRequest(totalNumOfRecords); + List> futureList = new ArrayList<>(keyGroupMap.size()); keyGroupMap.forEach((ignored, recordsWithTheSameKey) -> { futureList.add(CompletableFuture.runAsync(() -> { @@ -153,7 +177,13 @@ public List= 0) { return offsetFromConsumer; } - try { - return RetryUtils.executeWithMaxAttempt(() -> { - long offset = getTopicManager(kafkaUrl).getLatestOffsetCachedNonBlocking(pubSubTopic, partition); - if (offset == -1) { - throw new VeniceException("Found latest offset -1"); - } - return offset; - }, 5, Duration.ofSeconds(1), Collections.singletonList(VeniceException.class)); - } catch (Exception e) { - LOGGER.error("Could not find latest offset for {} even after 5 retries", pubSubTopic.getName()); - return -1; - } + return getTopicManager(kafkaUrl).getLatestOffsetCached(pubSubTopic, partition); } protected long getPartitionOffsetLagBasedOnMetrics(String kafkaSourceAddress, PubSubTopic topic, int partition) { diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/stats/AggVersionedIngestionStats.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/stats/AggVersionedIngestionStats.java index 6301af96fb..d31bf4a8cb 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/stats/AggVersionedIngestionStats.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/stats/AggVersionedIngestionStats.java @@ -255,4 +255,19 @@ public void registerTransformerErrorSensor(String storeName, int version) { getStats(storeName, version).registerTransformerErrorSensor(); getTotalStats(storeName).registerTransformerErrorSensor(); } + + public void recordBatchProcessingRequest(String storeName, int version, int size, long timestamp) { + recordVersionedAndTotalStat(storeName, version, stat -> stat.recordBatchProcessingRequest(size, timestamp)); + } + + public void recordBatchProcessingRequestError(String storeName, int version) { + recordVersionedAndTotalStat(storeName, version, stat -> stat.recordBatchProcessingRequestError()); + } + + public void recordBatchProcessingLatency(String storeName, int version, double latency, long timestamp) { + recordVersionedAndTotalStat( + storeName, + version, + stat -> stat.recordBatchProcessingRequestLatency(latency, timestamp)); + } } diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/stats/HostLevelIngestionStats.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/stats/HostLevelIngestionStats.java index e3fb9c3dea..661ed0a967 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/stats/HostLevelIngestionStats.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/stats/HostLevelIngestionStats.java @@ -1,5 +1,11 @@ package com.linkedin.davinci.stats; +import static com.linkedin.davinci.stats.IngestionStats.BATCH_PROCESSING_REQUEST; +import static com.linkedin.davinci.stats.IngestionStats.BATCH_PROCESSING_REQUEST_ERROR; +import static com.linkedin.davinci.stats.IngestionStats.BATCH_PROCESSING_REQUEST_LATENCY; +import static com.linkedin.davinci.stats.IngestionStats.BATCH_PROCESSING_REQUEST_RECORDS; +import static com.linkedin.davinci.stats.IngestionStats.BATCH_PROCESSING_REQUEST_SIZE; + import com.linkedin.davinci.config.VeniceServerConfig; import com.linkedin.davinci.kafka.consumer.PartitionConsumptionState; import com.linkedin.davinci.kafka.consumer.StoreIngestionTask; @@ -138,6 +144,13 @@ public class HostLevelIngestionStats extends AbstractVeniceStats { */ private final LongAdderRateGauge totalTombstoneCreationDCRRate; + private final Sensor leaderProduceLatencySensor; + private final LongAdderRateGauge batchProcessingRequestSensor; + private final Sensor batchProcessingRequestSizeSensor; + private final LongAdderRateGauge batchProcessingRequestRecordsSensor; + private final Sensor batchProcessingRequestLatencySensor; + private final LongAdderRateGauge batchProcessingRequestErrorSensor; + /** * @param totalStats the total stats singleton instance, or null if we are constructing the total stats */ @@ -435,6 +448,37 @@ public HostLevelIngestionStats( totalStats, () -> totalStats.leaderIngestionActiveActiveDeleteLatencySensor, avgAndMax()); + + this.leaderProduceLatencySensor = registerPerStoreAndTotalSensor( + "leader_produce_latency", + totalStats, + () -> totalStats.leaderProduceLatencySensor, + avgAndMax()); + this.batchProcessingRequestSensor = registerOnlyTotalRate( + BATCH_PROCESSING_REQUEST, + totalStats, + () -> totalStats.batchProcessingRequestSensor, + time); + this.batchProcessingRequestErrorSensor = registerOnlyTotalRate( + BATCH_PROCESSING_REQUEST_ERROR, + totalStats, + () -> totalStats.batchProcessingRequestErrorSensor, + time); + this.batchProcessingRequestRecordsSensor = registerOnlyTotalRate( + BATCH_PROCESSING_REQUEST_RECORDS, + totalStats, + () -> totalStats.batchProcessingRequestRecordsSensor, + time); + this.batchProcessingRequestSizeSensor = registerOnlyTotalSensor( + BATCH_PROCESSING_REQUEST_SIZE, + totalStats, + () -> totalStats.batchProcessingRequestSizeSensor, + avgAndMax()); + this.batchProcessingRequestLatencySensor = registerOnlyTotalSensor( + BATCH_PROCESSING_REQUEST_LATENCY, + totalStats, + () -> totalStats.batchProcessingRequestLatencySensor, + avgAndMax()); } /** Record a host-level byte consumption rate across all store versions */ @@ -614,4 +658,22 @@ public void recordTimestampRegressionDCRError() { public void recordOffsetRegressionDCRError() { totalOffsetRegressionDCRErrorRate.record(); } + + public void recordLeaderProduceLatency(double latency) { + leaderProduceLatencySensor.record(latency); + } + + public void recordBatchProcessingRequest(int size) { + batchProcessingRequestSensor.record(); + batchProcessingRequestRecordsSensor.record(size); + batchProcessingRequestSizeSensor.record(size); + } + + public void recordBatchProcessingRequestError() { + batchProcessingRequestErrorSensor.record(); + } + + public void recordBatchProcessingRequestLatency(double latency) { + batchProcessingRequestLatencySensor.record(latency); + } } diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/stats/IngestionStats.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/stats/IngestionStats.java index 7816f47010..281ee815a8 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/stats/IngestionStats.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/stats/IngestionStats.java @@ -64,6 +64,11 @@ public class IngestionStats { public static final String PRODUCER_CALLBACK_LATENCY = "producer_callback_latency"; public static final String LEADER_PREPROCESSING_LATENCY = "leader_preprocessing_latency"; public static final String INTERNAL_PREPROCESSING_LATENCY = "internal_preprocessing_latency"; + public static final String BATCH_PROCESSING_REQUEST = "batch_processing_request"; + public static final String BATCH_PROCESSING_REQUEST_SIZE = "batch_processing_request_size"; + public static final String BATCH_PROCESSING_REQUEST_RECORDS = "batch_processing_request_records"; + public static final String BATCH_PROCESSING_REQUEST_LATENCY = "batch_processing_request_latency"; + public static final String BATCH_PROCESSING_REQUEST_ERROR = "batch_processing_request_error"; private static final MetricConfig METRIC_CONFIG = new MetricConfig(); private StoreIngestionTask ingestionTask; @@ -117,6 +122,11 @@ public class IngestionStats { private Count transformerErrorCount = new Count(); private Sensor transformerErrorSensor; + private final LongAdderRateGauge batchProcessingRequestSensor = new LongAdderRateGauge(); + private final WritePathLatencySensor batchProcessingRequestSizeSensor; + private final LongAdderRateGauge batchProcessingRequestRecordsSensor = new LongAdderRateGauge(); + private final WritePathLatencySensor batchProcessingRequestLatencySensor; + private final LongAdderRateGauge batchProcessingRequestErrorSensor = new LongAdderRateGauge(); public IngestionStats(VeniceServerConfig serverConfig) { @@ -207,6 +217,14 @@ public IngestionStats(VeniceServerConfig serverConfig) { registerSensor(localMetricRepository, OFFSET_REGRESSION_DCR_ERROR, offsetRegressionDCRErrorSensor); registerSensor(localMetricRepository, TOMBSTONE_CREATION_DCR, tombstoneCreationDCRSensor); registerSensor(localMetricRepository, IDLE_TIME, idleTimeSensor); + + registerSensor(localMetricRepository, BATCH_PROCESSING_REQUEST, batchProcessingRequestSensor); + registerSensor(localMetricRepository, BATCH_PROCESSING_REQUEST_RECORDS, batchProcessingRequestRecordsSensor); + registerSensor(localMetricRepository, BATCH_PROCESSING_REQUEST_ERROR, batchProcessingRequestErrorSensor); + batchProcessingRequestSizeSensor = + new WritePathLatencySensor(localMetricRepository, METRIC_CONFIG, BATCH_PROCESSING_REQUEST_SIZE); + batchProcessingRequestLatencySensor = + new WritePathLatencySensor(localMetricRepository, METRIC_CONFIG, BATCH_PROCESSING_REQUEST_LATENCY); } private void registerSensor(MetricsRepository localMetricRepository, String sensorName, LongAdderRateGauge gauge) { @@ -665,6 +683,40 @@ public WritePathLatencySensor getLeaderProducerCompletionLatencySensor() { return leaderProducerCompletionLatencySensor; } + public void recordBatchProcessingRequest(int size, long currentTimeMs) { + batchProcessingRequestSensor.record(); + batchProcessingRequestRecordsSensor.record(size); + batchProcessingRequestSizeSensor.record(size, currentTimeMs); + } + + public double getBatchProcessingRequest() { + return batchProcessingRequestSensor.getRate(); + } + + public double getBatchProcessingRequestRecords() { + return batchProcessingRequestRecordsSensor.getRate(); + } + + public void recordBatchProcessingRequestError() { + batchProcessingRequestErrorSensor.record(); + } + + public double getBatchProcessingRequestError() { + return batchProcessingRequestErrorSensor.getRate(); + } + + public WritePathLatencySensor getBatchProcessingRequestSizeSensor() { + return batchProcessingRequestSizeSensor; + } + + public void recordBatchProcessingRequestLatency(double latency, long currentTimeMs) { + batchProcessingRequestLatencySensor.record(latency, currentTimeMs); + } + + public WritePathLatencySensor getBatchProcessingRequestLatencySensor() { + return batchProcessingRequestLatencySensor; + } + public static double unAvailableToZero(double value) { /* When data is unavailable, return 0 instead of NaN or Infinity. Some metrics are initialized to -INF. This can cause problems when metrics are aggregated. Use only when zero makes semantic sense. diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/stats/IngestionStatsReporter.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/stats/IngestionStatsReporter.java index 9630437ed1..e88f39e5c9 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/stats/IngestionStatsReporter.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/stats/IngestionStatsReporter.java @@ -2,6 +2,11 @@ import static com.linkedin.davinci.stats.IngestionStats.BATCH_FOLLOWER_OFFSET_LAG; import static com.linkedin.davinci.stats.IngestionStats.BATCH_LEADER_OFFSET_LAG; +import static com.linkedin.davinci.stats.IngestionStats.BATCH_PROCESSING_REQUEST; +import static com.linkedin.davinci.stats.IngestionStats.BATCH_PROCESSING_REQUEST_ERROR; +import static com.linkedin.davinci.stats.IngestionStats.BATCH_PROCESSING_REQUEST_LATENCY; +import static com.linkedin.davinci.stats.IngestionStats.BATCH_PROCESSING_REQUEST_RECORDS; +import static com.linkedin.davinci.stats.IngestionStats.BATCH_PROCESSING_REQUEST_SIZE; import static com.linkedin.davinci.stats.IngestionStats.BATCH_REPLICATION_LAG; import static com.linkedin.davinci.stats.IngestionStats.BYTES_CONSUMED_METRIC_NAME; import static com.linkedin.davinci.stats.IngestionStats.CONSUMED_RECORD_END_TO_END_PROCESSING_LATENCY; @@ -226,6 +231,44 @@ protected void registerStats() { "local_broker_to_follower_consumer", IngestionStats::getLocalBrokerFollowerConsumerLatencySensor); registerLatencySensor("leader_producer_completion", IngestionStats::getLeaderProducerCompletionLatencySensor); + + registerSensor( + new IngestionStatsGauge(this, () -> getStats().getBatchProcessingRequest(), 0, BATCH_PROCESSING_REQUEST)); + registerSensor( + new IngestionStatsGauge( + this, + () -> getStats().getBatchProcessingRequestError(), + BATCH_PROCESSING_REQUEST_ERROR)); + registerSensor( + new IngestionStatsGauge( + this, + () -> getStats().getBatchProcessingRequestRecords(), + 0, + BATCH_PROCESSING_REQUEST_RECORDS)); + registerSensor( + new IngestionStatsGauge( + this, + () -> getStats().getBatchProcessingRequestSizeSensor().getAvg(), + 0, + BATCH_PROCESSING_REQUEST_SIZE + "_avg")); + registerSensor( + new IngestionStatsGauge( + this, + () -> getStats().getBatchProcessingRequestSizeSensor().getMax(), + 0, + BATCH_PROCESSING_REQUEST_SIZE + "_max")); + registerSensor( + new IngestionStatsGauge( + this, + () -> getStats().getBatchProcessingRequestLatencySensor().getAvg(), + 0, + BATCH_PROCESSING_REQUEST_LATENCY + "_avg")); + registerSensor( + new IngestionStatsGauge( + this, + () -> getStats().getBatchProcessingRequestLatencySensor().getMax(), + 0, + BATCH_PROCESSING_REQUEST_LATENCY + "_max")); } } diff --git a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/ActiveActiveStoreIngestionTaskTest.java b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/ActiveActiveStoreIngestionTaskTest.java index afedb84fe1..86c4a32b77 100644 --- a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/ActiveActiveStoreIngestionTaskTest.java +++ b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/ActiveActiveStoreIngestionTaskTest.java @@ -5,6 +5,7 @@ import static com.linkedin.venice.ConfigKeys.ZOOKEEPER_ADDRESS; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyBoolean; +import static org.mockito.ArgumentMatchers.anyDouble; import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.ArgumentMatchers.anyString; @@ -315,8 +316,9 @@ public void testLeaderCanSendValueChunksIntoDrainer() throws InterruptedExceptio long beforeProcessingRecordTimestamp = 0; boolean resultReuseInput = true; + HostLevelIngestionStats mockHostLevelIngestionStats = mock(HostLevelIngestionStats.class); ActiveActiveStoreIngestionTask ingestionTask = mock(ActiveActiveStoreIngestionTask.class); - when(ingestionTask.getHostLevelIngestionStats()).thenReturn(mock(HostLevelIngestionStats.class)); + when(ingestionTask.getHostLevelIngestionStats()).thenReturn(mockHostLevelIngestionStats); when(ingestionTask.getVersionIngestionStats()).thenReturn(mock(AggVersionedIngestionStats.class)); when(ingestionTask.getVersionedDIVStats()).thenReturn(mock(AggVersionedDIVStats.class)); when(ingestionTask.getKafkaVersionTopic()).thenReturn(testTopic); @@ -452,6 +454,7 @@ public void testLeaderCanSendValueChunksIntoDrainer() throws InterruptedExceptio Assert.assertEquals( leaderProducedRecordContextArgumentCaptor.getAllValues().get(3).getKeyBytes(), kafkaKeyArgumentCaptor.getAllValues().get(4).getKey()); + verify(mockHostLevelIngestionStats).recordLeaderProduceLatency(anyDouble()); } @Test diff --git a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/IngestionBatchProcessorTest.java b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/IngestionBatchProcessorTest.java index 9225980610..768e3ae364 100644 --- a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/IngestionBatchProcessorTest.java +++ b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/IngestionBatchProcessorTest.java @@ -1,15 +1,22 @@ package com.linkedin.davinci.kafka.consumer; +import static org.mockito.ArgumentMatchers.anyDouble; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertTrue; +import static org.testng.Assert.expectThrows; import com.linkedin.alpini.base.concurrency.ExecutorService; import com.linkedin.alpini.base.concurrency.Executors; +import com.linkedin.davinci.stats.AggVersionedIngestionStats; +import com.linkedin.davinci.stats.HostLevelIngestionStats; import com.linkedin.davinci.utils.ByteArrayKey; +import com.linkedin.venice.exceptions.VeniceException; import com.linkedin.venice.kafka.protocol.KafkaMessageEnvelope; import com.linkedin.venice.kafka.protocol.Put; import com.linkedin.venice.kafka.protocol.enums.MessageType; @@ -106,7 +113,9 @@ public void lockKeysTest() { mockKeyLevelLocksManager, (ignored1, ignored2, ignored3, ignored4, ignored5, ignored6, ignored7) -> null, true, - true); + true, + mock(AggVersionedIngestionStats.class), + mock(HostLevelIngestionStats.class)); List locks = batchProcessor.lockKeys(Arrays.asList(rtMessage1, rtMessage2)); verify(mockKeyLevelLocksManager).acquireLockByKey(ByteArrayKey.wrap(key1)); verify(mockKeyLevelLocksManager).acquireLockByKey(ByteArrayKey.wrap(key2)); @@ -146,6 +155,9 @@ public void processTest() { 101, 100); + AggVersionedIngestionStats mockAggVersionedIngestionStats = mock(AggVersionedIngestionStats.class); + HostLevelIngestionStats mockHostLevelIngestionStats = mock(HostLevelIngestionStats.class); + IngestionBatchProcessor batchProcessor = new IngestionBatchProcessor( "store_v1", Executors.newFixedThreadPool(1, new DaemonThreadFactory("test")), @@ -165,7 +177,9 @@ public void processTest() { return null; }, true, - true); + true, + mockAggVersionedIngestionStats, + mockHostLevelIngestionStats); List> result = batchProcessor.process( Arrays.asList(rtMessage1, rtMessage2), @@ -185,6 +199,45 @@ public void processTest() { assertEquals( resultForKey2.getProcessedResult().getWriteComputeResultWrapper().getNewPut().putValue.array(), "value2".getBytes()); + verify(mockAggVersionedIngestionStats).recordBatchProcessingRequest(eq("store"), eq(1), eq(2), anyLong()); + verify(mockAggVersionedIngestionStats).recordBatchProcessingLatency(eq("store"), eq(1), anyDouble(), anyLong()); + verify(mockHostLevelIngestionStats).recordBatchProcessingRequest(2); + verify(mockHostLevelIngestionStats).recordBatchProcessingRequestLatency(anyDouble()); + + // Error path + batchProcessor = new IngestionBatchProcessor( + "store_v1", + Executors.newFixedThreadPool(1, new DaemonThreadFactory("test")), + mockKeyLevelLocksManager, + (consumerRecord, ignored2, ignored3, ignored4, ignored5, ignored6, ignored7) -> { + if (Arrays.equals(consumerRecord.getKey().getKey(), "key1".getBytes())) { + Put put = new Put(); + put.setPutValue(ByteBuffer.wrap("value1".getBytes())); + WriteComputeResultWrapper writeComputeResultWrapper = new WriteComputeResultWrapper(put, null, true); + return new PubSubMessageProcessedResult(writeComputeResultWrapper); + } else if (Arrays.equals(consumerRecord.getKey().getKey(), "key2".getBytes())) { + throw new VeniceException("Fake"); + } + return null; + }, + true, + true, + mockAggVersionedIngestionStats, + mockHostLevelIngestionStats); + final IngestionBatchProcessor finalBatchProcessor = batchProcessor; + VeniceException exception = expectThrows( + VeniceException.class, + () -> finalBatchProcessor.process( + Arrays.asList(rtMessage1, rtMessage2), + mock(PartitionConsumptionState.class), + 1, + "test_kafka", + 1, + 1, + 1)); + assertTrue(exception.getMessage().contains("Failed to execute the batch processing")); + verify(mockAggVersionedIngestionStats).recordBatchProcessingRequestError("store", 1); + verify(mockHostLevelIngestionStats).recordBatchProcessingRequestError(); } } diff --git a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTaskTest.java b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTaskTest.java index d02e59bd9c..a5b5969a80 100644 --- a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTaskTest.java +++ b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTaskTest.java @@ -3017,9 +3017,6 @@ public void testIsReadyToServe(NodeType nodeType, AAConfig aaConfig, DataReplica doReturn(5L).when(mockTopicManager).getLatestOffsetCached(any(), anyInt()); doReturn(150L).when(mockTopicManagerRemoteKafka).getLatestOffsetCached(any(), anyInt()); doReturn(150L).when(aggKafkaConsumerService).getLatestOffsetBasedOnMetrics(anyString(), any(), any()); - long endOffset = - storeIngestionTaskUnderTest.getTopicPartitionEndOffSet(localKafkaConsumerService.kafkaUrl, pubSubTopic, 1); - assertEquals(endOffset, 150L); if (nodeType == NodeType.LEADER) { // case 6a: leader replica => partition is not ready to serve doReturn(LeaderFollowerStateType.LEADER).when(mockPcsBufferReplayStartedRemoteLagging).getLeaderFollowerState(); @@ -3179,15 +3176,6 @@ public void testActiveActiveStoreIsReadyToServe(HybridConfig hybridConfig, NodeT } else { assertTrue(storeIngestionTaskUnderTest.isReadyToServe(mockPcsMultipleSourceKafkaServers)); } - doReturn(10L).when(aggKafkaConsumerService).getLatestOffsetBasedOnMetrics(anyString(), any(), any()); - long endOffset = - storeIngestionTaskUnderTest.getTopicPartitionEndOffSet(localKafkaConsumerService.kafkaUrl, pubSubTopic, 0); - assertEquals(endOffset, 10L); - doReturn(-1L).when(aggKafkaConsumerService).getLatestOffsetBasedOnMetrics(anyString(), any(), any()); - endOffset = - storeIngestionTaskUnderTest.getTopicPartitionEndOffSet(localKafkaConsumerService.kafkaUrl, pubSubTopic, 0); - assertEquals(endOffset, 0L); - } @DataProvider diff --git a/internal/venice-client-common/src/main/java/com/linkedin/venice/stats/AbstractVeniceStats.java b/internal/venice-client-common/src/main/java/com/linkedin/venice/stats/AbstractVeniceStats.java index 1aad9bc3b8..9bbf2dee89 100644 --- a/internal/venice-client-common/src/main/java/com/linkedin/venice/stats/AbstractVeniceStats.java +++ b/internal/venice-client-common/src/main/java/com/linkedin/venice/stats/AbstractVeniceStats.java @@ -201,6 +201,19 @@ protected Sensor registerPerStoreAndTotalSensor( return registerSensor(sensorName, parent, stats); } + protected Sensor registerOnlyTotalSensor( + String sensorName, + AbstractVeniceStats totalStats, + Supplier totalSensor, + MeasurableStat... stats) { + + if (totalStats == null) { + return registerSensor(sensorName, stats); + } else { + return totalSensor.get(); + } + } + /** * Only register sensor for total stats. If not provided, create a new one. * @param sensorName diff --git a/internal/venice-client-common/src/main/java/com/linkedin/venice/utils/AvroSchemaUtils.java b/internal/venice-client-common/src/main/java/com/linkedin/venice/utils/AvroSchemaUtils.java index 7d3340ed27..fe96a6cb5b 100644 --- a/internal/venice-client-common/src/main/java/com/linkedin/venice/utils/AvroSchemaUtils.java +++ b/internal/venice-client-common/src/main/java/com/linkedin/venice/utils/AvroSchemaUtils.java @@ -262,10 +262,9 @@ public static SchemaEntry generateSupersetSchemaFromAllValueSchemas(Collection combinedSchema = new ArrayList<>(); - Map s2Schema = s2.getTypes().stream().collect(Collectors.toMap(s -> s.getName(), s -> s)); - for (Schema subSchemaInS1: s1.getTypes()) { - final String fieldName = subSchemaInS1.getName(); - final Schema subSchemaWithSameNameInS2 = s2Schema.get(fieldName); - if (subSchemaWithSameNameInS2 == null) { - combinedSchema.add(subSchemaInS1); + Map existingSchemaTypeMap = + existingSchema.getTypes().stream().collect(Collectors.toMap(Schema::getName, s -> s)); + for (Schema subSchemaInNewSchema: newSchema.getTypes()) { + final String fieldName = subSchemaInNewSchema.getName(); + final Schema subSchemaInExistingSchema = existingSchemaTypeMap.get(fieldName); + if (subSchemaInExistingSchema == null) { + combinedSchema.add(subSchemaInNewSchema); } else { - combinedSchema.add(generateSuperSetSchema(subSchemaInS1, subSchemaWithSameNameInS2)); - s2Schema.remove(fieldName); + combinedSchema.add(generateSupersetSchema(subSchemaInExistingSchema, subSchemaInNewSchema)); + existingSchemaTypeMap.remove(fieldName); } } - s2Schema.forEach((k, v) -> combinedSchema.add(v)); - + existingSchemaTypeMap.forEach((k, v) -> combinedSchema.add(v)); return Schema.createUnion(combinedSchema); } @@ -135,25 +140,31 @@ private static FieldBuilder deepCopySchemaField(Schema.Field field) { return fieldBuilder; } - private static List mergeFieldSchemas(Schema s1, Schema s2) { + /** + * Merge field schema from two schema object. The rule is: If a field exist in both new schema and old schema, we should + * generate the superset schema of these two versions of the same field, with new schema's information taking higher + * priority. + * @param newSchema new schema + * @param existingSchema old schema + * @return merged schema field + */ + private static List mergeFieldSchemas(Schema existingSchema, Schema newSchema) { List fields = new ArrayList<>(); - for (Schema.Field f1: s1.getFields()) { - Schema.Field f2 = s2.getField(f1.name()); + for (Schema.Field fieldInNewSchema: newSchema.getFields()) { + Schema.Field fieldInExistingSchema = existingSchema.getField(fieldInNewSchema.name()); - FieldBuilder fieldBuilder = deepCopySchemaField(f1); - if (f2 != null) { - fieldBuilder.setSchema(generateSuperSetSchema(f1.schema(), f2.schema())) - .setDoc(f1.doc() != null ? f1.doc() : f2.doc()); - // merge props from f2 - copyFieldProperties(fieldBuilder, f2); + FieldBuilder fieldBuilder = deepCopySchemaField(fieldInNewSchema); + if (fieldInExistingSchema != null) { + fieldBuilder.setSchema(generateSupersetSchema(fieldInExistingSchema.schema(), fieldInNewSchema.schema())) + .setDoc(fieldInNewSchema.doc() != null ? fieldInNewSchema.doc() : fieldInExistingSchema.doc()); } fields.add(fieldBuilder.build()); } - for (Schema.Field f2: s2.getFields()) { - if (s1.getField(f2.name()) == null) { - fields.add(deepCopySchemaField(f2).build()); + for (Schema.Field fieldInExistingSchema: existingSchema.getFields()) { + if (newSchema.getField(fieldInExistingSchema.name()) == null) { + fields.add(deepCopySchemaField(fieldInExistingSchema).build()); } } return fields; diff --git a/internal/venice-client-common/src/main/java/com/linkedin/venice/utils/VeniceProperties.java b/internal/venice-client-common/src/main/java/com/linkedin/venice/utils/VeniceProperties.java index dfafe3ef15..e9855ece93 100644 --- a/internal/venice-client-common/src/main/java/com/linkedin/venice/utils/VeniceProperties.java +++ b/internal/venice-client-common/src/main/java/com/linkedin/venice/utils/VeniceProperties.java @@ -389,6 +389,7 @@ public List getList(String key, List defaultValue) { } String value = get(key); + String[] pieces = value.split("\\s*,\\s*"); return Arrays.asList(pieces); } diff --git a/internal/venice-client-common/src/test/java/com/linkedin/venice/schema/TestAvroSupersetSchemaUtils.java b/internal/venice-client-common/src/test/java/com/linkedin/venice/schema/TestAvroSupersetSchemaUtils.java index ed8766d851..cdcc39d553 100644 --- a/internal/venice-client-common/src/test/java/com/linkedin/venice/schema/TestAvroSupersetSchemaUtils.java +++ b/internal/venice-client-common/src/test/java/com/linkedin/venice/schema/TestAvroSupersetSchemaUtils.java @@ -6,6 +6,7 @@ import static com.linkedin.venice.utils.TestWriteUtils.NAME_RECORD_V4_SCHEMA; import static com.linkedin.venice.utils.TestWriteUtils.NAME_RECORD_V5_SCHEMA; import static com.linkedin.venice.utils.TestWriteUtils.NAME_RECORD_V6_SCHEMA; +import static com.linkedin.venice.utils.TestWriteUtils.loadFileAsString; import com.linkedin.avroutil1.compatibility.AvroCompatibilityHelper; import com.linkedin.venice.controllerapi.MultiSchemaResponse; @@ -36,7 +37,7 @@ public void testGenerateSupersetSchemaFromValueSchemasWithTwoSchemas() { AvroSchemaUtils.generateSupersetSchemaFromAllValueSchemas(Arrays.asList(schemaEntry1, schemaEntry2)); final Schema expectedSupersetSchema = - AvroSupersetSchemaUtils.generateSuperSetSchema(schemaEntry1.getSchema(), schemaEntry2.getSchema()); + AvroSupersetSchemaUtils.generateSupersetSchema(schemaEntry1.getSchema(), schemaEntry2.getSchema()); Assert.assertTrue( AvroSchemaUtils.compareSchemaIgnoreFieldOrder(expectedSupersetSchema, supersetSchemaEntry.getSchema())); Assert.assertEquals(supersetSchemaEntry.getId(), 2); @@ -142,7 +143,7 @@ public void testSupersetSchemaDefaultCompatibility() { Schema newValueSchema = AvroSchemaParseUtils.parseSchemaFromJSONStrictValidation(valueSchemaStr1); Schema existingValueSchema = AvroSchemaParseUtils.parseSchemaFromJSONStrictValidation(valueSchemaStr2); - Schema newSuperSetSchema = AvroSupersetSchemaUtils.generateSuperSetSchema(existingValueSchema, newValueSchema); + Schema newSuperSetSchema = AvroSupersetSchemaUtils.generateSupersetSchema(existingValueSchema, newValueSchema); Assert.assertTrue( new SchemaEntry(1, valueSchemaStr2) .isNewSchemaCompatible(new SchemaEntry(2, newSuperSetSchema), DirectionalSchemaCompatibilityType.FULL)); @@ -161,7 +162,7 @@ public void testStringVsAvroString() { Assert.assertNotEquals(s1, s2); Assert.assertTrue(AvroSchemaUtils.compareSchemaIgnoreFieldOrder(s1, s2)); - Schema s3 = AvroSupersetSchemaUtils.generateSuperSetSchema(s2, s1); + Schema s3 = AvroSupersetSchemaUtils.generateSupersetSchema(s2, s1); Assert.assertNotNull(s3); Assert.assertNotNull( AvroCompatibilityHelper.getSchemaPropAsJsonString(s3.getField("name").schema(), "avro.java.string")); @@ -177,7 +178,7 @@ public void testWithDifferentDocField() { Schema s2 = AvroSchemaParseUtils.parseSchemaFromJSONStrictValidation(schemaStr2); Assert.assertTrue(AvroSchemaUtils.compareSchemaIgnoreFieldOrder(s1, s2)); - Schema s3 = AvroSupersetSchemaUtils.generateSuperSetSchema(s1, s2); + Schema s3 = AvroSupersetSchemaUtils.generateSupersetSchema(s1, s2); Assert.assertNotNull(s3); } @@ -191,7 +192,7 @@ public void testSchemaMerge() { Schema s1 = AvroSchemaParseUtils.parseSchemaFromJSONStrictValidation(schemaStr1); Schema s2 = AvroSchemaParseUtils.parseSchemaFromJSONStrictValidation(schemaStr2); Assert.assertFalse(AvroSchemaUtils.compareSchemaIgnoreFieldOrder(s1, s2)); - Schema s3 = AvroSupersetSchemaUtils.generateSuperSetSchema(s1, s2); + Schema s3 = AvroSupersetSchemaUtils.generateSupersetSchema(s1, s2); Assert.assertNotNull(s3); } @@ -206,7 +207,7 @@ public void testSchemaMergeFields() { Schema s2 = AvroSchemaParseUtils.parseSchemaFromJSONStrictValidation(schemaStr2); Assert.assertFalse(AvroSchemaUtils.compareSchemaIgnoreFieldOrder(s1, s2)); - Schema s3 = AvroSupersetSchemaUtils.generateSuperSetSchema(s1, s2); + Schema s3 = AvroSupersetSchemaUtils.generateSupersetSchema(s1, s2); Assert.assertNotNull(s3.getField("id1")); Assert.assertNotNull(s3.getField("id2")); } @@ -222,7 +223,7 @@ public void testSchemaMergeFieldsBadDefaults() { Schema s2 = AvroSchemaParseUtils.parseSchemaFromJSONStrictValidation(schemaStr2); Assert.assertFalse(AvroSchemaUtils.compareSchemaIgnoreFieldOrder(s1, s2)); - Schema s3 = AvroSupersetSchemaUtils.generateSuperSetSchema(s1, s2); + Schema s3 = AvroSupersetSchemaUtils.generateSupersetSchema(s1, s2); Assert.assertNotNull(s3.getField("id1")); Assert.assertNotNull(s3.getField("id2")); } @@ -237,7 +238,7 @@ public void testWithIncompatibleSchema() { Schema s2 = AvroSchemaParseUtils.parseSchemaFromJSONStrictValidation(schemaStr2); Assert.assertFalse(AvroSchemaUtils.compareSchemaIgnoreFieldOrder(s1, s2)); - AvroSupersetSchemaUtils.generateSuperSetSchema(s1, s2); + AvroSupersetSchemaUtils.generateSupersetSchema(s1, s2); } @Test @@ -251,11 +252,26 @@ public void testSchemaMergeUnion() { Schema s2 = AvroSchemaParseUtils.parseSchemaFromJSONStrictValidation(schemaStr2); Assert.assertFalse(AvroSchemaUtils.compareSchemaIgnoreFieldOrder(s1, s2)); - Schema s3 = AvroSupersetSchemaUtils.generateSuperSetSchema(s1, s2); + Schema s3 = AvroSupersetSchemaUtils.generateSupersetSchema(s1, s2); Assert.assertNotNull(s3.getField("company")); Assert.assertNotNull(s3.getField("organization")); } + @Test + public void testSchemaMergeUnionWithComplexItemType() { + Schema s1 = AvroSchemaParseUtils.parseSchemaFromJSONStrictValidation(loadFileAsString("UnionV1.avsc")); + Schema s2 = AvroSchemaParseUtils.parseSchemaFromJSONStrictValidation(loadFileAsString("UnionV2.avsc")); + Assert.assertFalse(AvroSchemaUtils.compareSchemaIgnoreFieldOrder(s1, s2)); + Schema s3 = AvroSupersetSchemaUtils.generateSupersetSchema(s1, s2); + Assert.assertNotNull(s3.getField("age")); + Assert.assertNotNull(s3.getField("field")); + Schema.Field subFieldInS2 = s2.getField("field"); + Schema.Field subFieldInS3 = s3.getField("field"); + Schema unionSubFieldInS2 = subFieldInS2.schema().getTypes().get(1); + Schema unionSubFieldInS3 = subFieldInS3.schema().getTypes().get(1); + Assert.assertEquals(unionSubFieldInS3, unionSubFieldInS2); + } + @Test public void testWithNewFieldArrayRecord() { String recordSchemaStr1 = "{\n" + " \"type\" : \"record\",\n" + " \"name\" : \"testRecord\",\n" @@ -280,7 +296,7 @@ public void testWithNewFieldArrayRecord() { Schema s2 = AvroSchemaParseUtils.parseSchemaFromJSONStrictValidation(recordSchemaStr2); Assert.assertFalse(AvroSchemaUtils.compareSchemaIgnoreFieldOrder(s1, s2)); - Schema s3 = AvroSupersetSchemaUtils.generateSuperSetSchema(s1, s2); + Schema s3 = AvroSupersetSchemaUtils.generateSupersetSchema(s1, s2); Assert.assertNotNull(s3); } @@ -309,7 +325,7 @@ public void tesMergeWithDefaultValueUpdate() { Schema s2 = AvroSchemaParseUtils.parseSchemaFromJSONStrictValidation(schemaStr2); Assert.assertTrue(AvroSchemaUtils.compareSchemaIgnoreFieldOrder(s1, s2)); - Schema s3 = AvroSupersetSchemaUtils.generateSuperSetSchema(s1, s2); + Schema s3 = AvroSupersetSchemaUtils.generateSupersetSchema(s1, s2); Assert.assertNotNull(AvroSchemaUtils.getFieldDefault(s3.getField("salary"))); } @@ -333,7 +349,7 @@ public void testWithEnumEvolution() { Schema s2 = AvroSchemaParseUtils.parseSchemaFromJSONStrictValidation(schemaStr2); Assert.assertFalse(AvroSchemaUtils.compareSchemaIgnoreFieldOrder(s1, s2)); - AvroSupersetSchemaUtils.generateSuperSetSchema(s1, s2); + AvroSupersetSchemaUtils.generateSupersetSchema(s1, s2); } @Test @@ -425,8 +441,7 @@ public void testSupersetSchemaContainsMergeFieldProps() { Schema schema1 = AvroSchemaParseUtils.parseSchemaFromJSONStrictValidation(valueSchemaStr1); Schema schema2 = AvroSchemaParseUtils.parseSchemaFromJSONStrictValidation(valueSchemaStr2); - - Schema supersetSchema = AvroSupersetSchemaUtils.generateSuperSetSchema(schema1, schema2); + Schema supersetSchema = AvroSupersetSchemaUtils.generateSupersetSchema(schema1, schema2); Schema.Field intField = supersetSchema.getField("int_field"); Schema.Field stringField = supersetSchema.getField("string_field"); @@ -496,7 +511,7 @@ public void testValidateSubsetSchema() { Assert.assertNotEquals(NAME_RECORD_V5_SCHEMA, NAME_RECORD_V6_SCHEMA); // Test validation skip comparing props when checking for subset schema. Schema supersetSchemaForV5AndV4 = - AvroSupersetSchemaUtils.generateSuperSetSchema(NAME_RECORD_V5_SCHEMA, NAME_RECORD_V4_SCHEMA); + AvroSupersetSchemaUtils.generateSupersetSchema(NAME_RECORD_V5_SCHEMA, NAME_RECORD_V4_SCHEMA); Assert.assertTrue( AvroSupersetSchemaUtils.validateSubsetValueSchema(NAME_RECORD_V5_SCHEMA, supersetSchemaForV5AndV4.toString())); Assert.assertTrue( diff --git a/internal/venice-client-common/src/test/java/com/linkedin/venice/stats/AbstractVeniceStatsTest.java b/internal/venice-client-common/src/test/java/com/linkedin/venice/stats/AbstractVeniceStatsTest.java index 3627a1c5c7..9531662e10 100644 --- a/internal/venice-client-common/src/test/java/com/linkedin/venice/stats/AbstractVeniceStatsTest.java +++ b/internal/venice-client-common/src/test/java/com/linkedin/venice/stats/AbstractVeniceStatsTest.java @@ -6,6 +6,7 @@ import static org.mockito.Mockito.never; import static org.mockito.Mockito.times; import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNotEquals; import static org.testng.Assert.assertTrue; import com.linkedin.venice.client.stats.BasicClientStats; @@ -228,4 +229,20 @@ public void testRegisterOnlyTotalRate() { sensor = stats.registerOnlyTotalRate("testSensor", null, () -> parentCount, SystemTime.INSTANCE); Assert.assertNotEquals(sensor, parentCount); } + + @Test + public void testRegisterOnlyTotalSensor() { + MetricsRepository metricsRepository = new MetricsRepository(); + + AbstractVeniceStats stats = new AbstractVeniceStats(metricsRepository, "testStore"); + AbstractVeniceStats totalStats = new AbstractVeniceStats(metricsRepository, "total"); + Sensor totalSensor = totalStats.registerSensor("testSensor", new OccurrenceRate()); + // 1) total stats is not null so use ths supplier + Sensor sensor = stats.registerOnlyTotalSensor("testSensor", totalStats, () -> totalSensor, new OccurrenceRate()); + assertEquals(sensor, totalSensor); + + // 2) total stats is null, so created a new one + Sensor newTotalSensor = stats.registerOnlyTotalSensor("testSensor", null, () -> totalSensor, new OccurrenceRate()); + assertNotEquals(newTotalSensor, totalSensor); + } } diff --git a/internal/venice-client-common/src/test/resources/UnionV1.avsc b/internal/venice-client-common/src/test/resources/UnionV1.avsc new file mode 100644 index 0000000000..81cd2b7006 --- /dev/null +++ b/internal/venice-client-common/src/test/resources/UnionV1.avsc @@ -0,0 +1,31 @@ +{ + "type" : "record", + "name" : "User", + "namespace" : "example.avro", + "fields" : [ + { + "name": "field", + "type": [ + "int", + { + "type": "record", + "name": "subField", + "fields": [ + { + "name": "name", + "type": "string", + "doc": "doc v1", + "default": "v1" + } + ] + } + ], + "default": 10 + }, + { + "name": "age", + "type": "int", + "default": 10 + } + ] +} diff --git a/internal/venice-client-common/src/test/resources/UnionV2.avsc b/internal/venice-client-common/src/test/resources/UnionV2.avsc new file mode 100644 index 0000000000..05288cf0d4 --- /dev/null +++ b/internal/venice-client-common/src/test/resources/UnionV2.avsc @@ -0,0 +1,26 @@ +{ + "type" : "record", + "name" : "User", + "namespace" : "example.avro", + "fields" : [ + { + "name": "field", + "type": [ + "int", + { + "type": "record", + "name": "subField", + "fields": [ + { + "name": "name", + "type": "string", + "doc": "doc v2", + "default": "v2" + } + ] + } + ], + "default": 20 + } + ] +} diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/pubsub/manager/TopicManager.java b/internal/venice-common/src/main/java/com/linkedin/venice/pubsub/manager/TopicManager.java index c91d3c452e..1c3c0ed915 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/pubsub/manager/TopicManager.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/pubsub/manager/TopicManager.java @@ -731,11 +731,6 @@ public long getLatestOffsetCached(PubSubTopic pubSubTopic, int partitionId) { return topicMetadataFetcher.getLatestOffsetCached(new PubSubTopicPartitionImpl(pubSubTopic, partitionId)); } - public long getLatestOffsetCachedNonBlocking(PubSubTopic pubSubTopic, int partitionId) { - return topicMetadataFetcher - .getLatestOffsetCachedNonBlocking(new PubSubTopicPartitionImpl(pubSubTopic, partitionId)); - } - public long getProducerTimestampOfLastDataMessageWithRetries(PubSubTopicPartition pubSubTopicPartition, int retries) { return topicMetadataFetcher.getProducerTimestampOfLastDataMessageWithRetries(pubSubTopicPartition, retries); } diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/pubsub/manager/TopicMetadataFetcher.java b/internal/venice-common/src/main/java/com/linkedin/venice/pubsub/manager/TopicMetadataFetcher.java index 4a4e7ac85e..9c8e80f4ac 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/pubsub/manager/TopicMetadataFetcher.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/pubsub/manager/TopicMetadataFetcher.java @@ -379,25 +379,6 @@ CompletableFuture getLatestOffsetWithRetriesAsync(PubSubTopicPartition pub .supplyAsync(() -> getLatestOffsetWithRetries(pubSubTopicPartition, retries), threadPoolExecutor); } - long getLatestOffsetCachedNonBlocking(PubSubTopicPartition pubSubTopicPartition) { - ValueAndExpiryTime cachedValue; - cachedValue = latestOffsetCache.get(pubSubTopicPartition); - updateCacheAsync( - pubSubTopicPartition, - cachedValue, - latestOffsetCache, - () -> getLatestOffsetWithRetriesAsync( - pubSubTopicPartition, - DEFAULT_MAX_RETRIES_FOR_POPULATING_TMD_CACHE_ENTRY)); - if (cachedValue == null) { - cachedValue = latestOffsetCache.get(pubSubTopicPartition); - if (cachedValue == null) { - return -1; - } - } - return cachedValue.getValue(); - } - long getLatestOffsetCached(PubSubTopicPartition pubSubTopicPartition) { ValueAndExpiryTime cachedValue; try { diff --git a/internal/venice-common/src/test/java/com/linkedin/venice/pubsub/manager/TopicMetadataFetcherTest.java b/internal/venice-common/src/test/java/com/linkedin/venice/pubsub/manager/TopicMetadataFetcherTest.java index bca7d15629..214be223c5 100644 --- a/internal/venice-common/src/test/java/com/linkedin/venice/pubsub/manager/TopicMetadataFetcherTest.java +++ b/internal/venice-common/src/test/java/com/linkedin/venice/pubsub/manager/TopicMetadataFetcherTest.java @@ -260,9 +260,6 @@ public void testGetTopicLatestOffsets() { assertEquals(res.size(), offsetsMap.size()); assertEquals(res.get(0), 111L); assertEquals(res.get(1), 222L); - assertEquals( - topicMetadataFetcher.getLatestOffsetCachedNonBlocking(new PubSubTopicPartitionImpl(pubSubTopic, 0)), - -1); verify(consumerMock, times(3)).partitionsFor(pubSubTopic); verify(consumerMock, times(1)).endOffsets(eq(offsetsMap.keySet()), any(Duration.class)); diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/controller/TestHAASController.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/controller/TestHAASController.java index 796edb2aa2..905a853693 100644 --- a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/controller/TestHAASController.java +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/controller/TestHAASController.java @@ -78,6 +78,27 @@ public void testClusterResourceInstanceTag() { } } + @Test(timeOut = 60 * Time.MS_PER_SECOND) + public void testClusterResourceEmptyInstanceTag() { + try (VeniceClusterWrapper venice = ServiceFactory.getVeniceCluster(0, 0, 0, 1); + HelixAsAServiceWrapper helixAsAServiceWrapper = startAndWaitForHAASToBeAvailable(venice.getZk().getAddress())) { + String instanceTag = ""; + String controllerClusterName = "venice-controllers"; + + Properties clusterProperties = (Properties) enableControllerAndStorageClusterHAASProperties.clone(); + clusterProperties.put(ConfigKeys.CONTROLLER_RESOURCE_INSTANCE_GROUP_TAG, instanceTag); + clusterProperties.put(ConfigKeys.CONTROLLER_INSTANCE_TAG_LIST, instanceTag); + + VeniceControllerWrapper controllerWrapper = venice.addVeniceController(clusterProperties); + + HelixAdmin helixAdmin = controllerWrapper.getVeniceHelixAdmin().getHelixAdmin(); + List resources = helixAdmin.getResourcesInClusterWithTag(controllerClusterName, instanceTag); + assertEquals(resources.size(), 0); + List instances = helixAdmin.getInstancesInClusterWithTag(controllerClusterName, instanceTag); + assertEquals(instances.size(), 0); + } + } + @Test(timeOut = 60 * Time.MS_PER_SECOND) public void testStartHAASHelixControllerAsControllerClusterLeader() { try (VeniceClusterWrapper venice = ServiceFactory.getVeniceCluster(0, 0, 0, 1); diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/controller/VeniceParentHelixAdminTest.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/controller/VeniceParentHelixAdminTest.java index 7ca1f0a7d6..af14b0eb21 100644 --- a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/controller/VeniceParentHelixAdminTest.java +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/controller/VeniceParentHelixAdminTest.java @@ -598,6 +598,7 @@ public void testStoreMetaDataUpdateFromParentToChildController( testWriteComputeSchemaAutoGeneration(parentControllerClient); testWriteComputeSchemaEnable(parentControllerClient); testWriteComputeSchemaAutoGenerationFailure(parentControllerClient); + testSupersetSchemaGenerationWithUpdateDefaultValue(parentControllerClient); testUpdateConfigs(parentControllerClient, childControllerClient); } } @@ -1132,6 +1133,48 @@ private void testWriteComputeSchemaEnable(ControllerClient parentControllerClien Assert.assertEquals(registeredWriteComputeSchema.size(), 1); } + private void testSupersetSchemaGenerationWithUpdateDefaultValue(ControllerClient parentControllerClient) { + String storeName = Utils.getUniqueString("test_store"); + String owner = "test_owner"; + String keySchemaStr = "\"long\""; + + // Step 1. Create a store with missing default fields schema + parentControllerClient + .createNewStore(storeName, owner, keySchemaStr, TestWriteUtils.UNION_RECORD_V1_SCHEMA.toString()); + MultiSchemaResponse valueAndWriteComputeSchemaResponse = + parentControllerClient.getAllValueAndDerivedSchema(storeName); + MultiSchemaResponse.Schema[] registeredSchemas = valueAndWriteComputeSchemaResponse.getSchemas(); + Assert.assertEquals(registeredSchemas.length, 1); + MultiSchemaResponse.Schema registeredSchema = registeredSchemas[0]; + Assert.assertFalse(registeredSchema.isDerivedSchema()); // No write compute schema yet. + + // Step 2. Update this store to enable write compute. + UpdateStoreQueryParams updateStoreQueryParams = new UpdateStoreQueryParams(); + updateStoreQueryParams.setWriteComputationEnabled(true); + parentControllerClient.updateStore(storeName, updateStoreQueryParams); + + // Could not enable write compute bad schema did not have defaults + StoreInfo store = parentControllerClient.getStore(storeName).getStore(); + Assert.assertTrue(store.isWriteComputationEnabled()); + Assert.assertEquals(store.getLatestSuperSetValueSchemaId(), 1); + + // Step 3. Add a valid latest value schema for write-compute + parentControllerClient.addValueSchema(storeName, TestWriteUtils.UNION_RECORD_V2_SCHEMA.toString()); + TestUtils.waitForNonDeterministicAssertion( + 30, + TimeUnit.SECONDS, + () -> Assert + .assertEquals(parentControllerClient.getStore(storeName).getStore().getLatestSuperSetValueSchemaId(), 2)); + + parentControllerClient.addValueSchema(storeName, TestWriteUtils.UNION_RECORD_V3_SCHEMA.toString()); + TestUtils.waitForNonDeterministicAssertion( + 30, + TimeUnit.SECONDS, + () -> Assert + .assertEquals(parentControllerClient.getStore(storeName).getStore().getLatestSuperSetValueSchemaId(), 3)); + + } + private List getWriteComputeSchemaStrs(MultiSchemaResponse.Schema[] registeredSchemas) { List writeComputeSchemaStrs = new ArrayList<>(); for (MultiSchemaResponse.Schema schema: registeredSchemas) { diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestEmptyPush.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestEmptyPush.java index 3a5db98724..15ae067ee4 100644 --- a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestEmptyPush.java +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestEmptyPush.java @@ -136,7 +136,8 @@ public void testEmptyPushByChangingCompressionStrategyForHybridStore() throws IO PubSubTopic storeRealTimeTopic = venice.getPubSubTopicRepository().getTopic(Version.composeRealTimeTopic(storeName)); assertTrue(topicManager.containsTopicAndAllPartitionsAreOnline(storeRealTimeTopic)); - + // One time refresh of router metadata. + venice.refreshAllRouterMetaData(); // Start writing some real-time records SystemProducer veniceProducer = IntegrationTestPushUtils.getSamzaProducer(venice, storeName, Version.PushType.STREAM); diff --git a/internal/venice-test-common/src/main/java/com/linkedin/venice/utils/TestWriteUtils.java b/internal/venice-test-common/src/main/java/com/linkedin/venice/utils/TestWriteUtils.java index 16ef0b5a48..62280a1dae 100644 --- a/internal/venice-test-common/src/main/java/com/linkedin/venice/utils/TestWriteUtils.java +++ b/internal/venice-test-common/src/main/java/com/linkedin/venice/utils/TestWriteUtils.java @@ -100,6 +100,13 @@ public class TestWriteUtils { public static final Schema NAME_RECORD_V6_SCHEMA = AvroCompatibilityHelper.parse(loadSchemaFileFromResource("valueSchema/NameV6.avsc")); + public static final Schema UNION_RECORD_V1_SCHEMA = + AvroCompatibilityHelper.parse(loadSchemaFileFromResource("valueSchema/UnionV1.avsc")); + public static final Schema UNION_RECORD_V2_SCHEMA = + AvroCompatibilityHelper.parse(loadSchemaFileFromResource("valueSchema/UnionV2.avsc")); + public static final Schema UNION_RECORD_V3_SCHEMA = + AvroCompatibilityHelper.parse(loadSchemaFileFromResource("valueSchema/UnionV3.avsc")); + // ETL Schema public static final Schema ETL_KEY_SCHEMA = AvroCompatibilityHelper.parse(loadSchemaFileFromResource("etl/Key.avsc")); public static final Schema ETL_VALUE_SCHEMA = diff --git a/internal/venice-test-common/src/main/resources/valueSchema/UnionV1.avsc b/internal/venice-test-common/src/main/resources/valueSchema/UnionV1.avsc new file mode 100644 index 0000000000..1dbc85a391 --- /dev/null +++ b/internal/venice-test-common/src/main/resources/valueSchema/UnionV1.avsc @@ -0,0 +1,15 @@ +{ + "type" : "record", + "name" : "User", + "namespace" : "example.avro", + "fields" : [ + { + "name": "count", + "type": [ + "int", + "null" + ], + "default": 0 + } + ] +} diff --git a/internal/venice-test-common/src/main/resources/valueSchema/UnionV2.avsc b/internal/venice-test-common/src/main/resources/valueSchema/UnionV2.avsc new file mode 100644 index 0000000000..e1b349247e --- /dev/null +++ b/internal/venice-test-common/src/main/resources/valueSchema/UnionV2.avsc @@ -0,0 +1,15 @@ +{ + "type" : "record", + "name" : "User", + "namespace" : "example.avro", + "fields" : [ + { + "name": "count", + "type": [ + "null", + "int" + ], + "default": null + } + ] +} diff --git a/internal/venice-test-common/src/main/resources/valueSchema/UnionV3.avsc b/internal/venice-test-common/src/main/resources/valueSchema/UnionV3.avsc new file mode 100644 index 0000000000..0912be4c31 --- /dev/null +++ b/internal/venice-test-common/src/main/resources/valueSchema/UnionV3.avsc @@ -0,0 +1,20 @@ +{ + "type" : "record", + "name" : "User", + "namespace" : "example.avro", + "fields" : [ + { + "name": "count", + "type": [ + "null", + "int" + ], + "default": null + }, + { + "name": "dummyField", + "type" : "string", + "default" : "" + } + ] +} diff --git a/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceControllerClusterConfig.java b/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceControllerClusterConfig.java index e0c1881e05..7d390d3b99 100644 --- a/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceControllerClusterConfig.java +++ b/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceControllerClusterConfig.java @@ -654,7 +654,13 @@ public VeniceControllerClusterConfig(VeniceProperties props) { this.adminCheckReadMethodForKafka = props.getBoolean(ADMIN_CHECK_READ_METHOD_FOR_KAFKA, true); this.controllerClusterName = props.getString(CONTROLLER_CLUSTER, "venice-controllers"); this.controllerResourceInstanceGroupTag = props.getString(CONTROLLER_RESOURCE_INSTANCE_GROUP_TAG, ""); - this.controllerInstanceTagList = props.getList(CONTROLLER_INSTANCE_TAG_LIST, Collections.emptyList()); + + if (props.getString(CONTROLLER_INSTANCE_TAG_LIST, "").isEmpty()) { + this.controllerInstanceTagList = Collections.emptyList(); + } else { + this.controllerInstanceTagList = props.getList(CONTROLLER_INSTANCE_TAG_LIST, Collections.emptyList()); + } + this.controllerClusterReplica = props.getInt(CONTROLLER_CLUSTER_REPLICA, 3); this.controllerClusterZkAddress = props.getString(CONTROLLER_CLUSTER_ZK_ADDRESSS, getZkAddress()); this.parent = props.getBoolean(CONTROLLER_PARENT_MODE, false); diff --git a/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceParentHelixAdmin.java b/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceParentHelixAdmin.java index aaf3fa1ed8..32331b5dea 100644 --- a/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceParentHelixAdmin.java +++ b/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceParentHelixAdmin.java @@ -3097,7 +3097,6 @@ public SchemaEntry addValueSchema( SupersetSchemaGenerator supersetSchemaGenerator = getSupersetSchemaGenerator(clusterName); Schema newSuperSetSchema = supersetSchemaGenerator.generateSupersetSchema(existingValueSchema, newValueSchema); String newSuperSetSchemaStr = newSuperSetSchema.toString(); - if (supersetSchemaGenerator.compareSchema(newSuperSetSchema, newValueSchema)) { doUpdateSupersetSchemaID = true; @@ -3143,7 +3142,6 @@ public SchemaEntry addValueSchema( } else { doUpdateSupersetSchemaID = false; } - SchemaEntry addedSchemaEntry = addValueSchemaEntry(clusterName, storeName, newValueSchemaStr, schemaId, doUpdateSupersetSchemaID); diff --git a/services/venice-controller/src/main/java/com/linkedin/venice/controller/supersetschema/DefaultSupersetSchemaGenerator.java b/services/venice-controller/src/main/java/com/linkedin/venice/controller/supersetschema/DefaultSupersetSchemaGenerator.java index 0696a37fc9..0a5557aaf3 100644 --- a/services/venice-controller/src/main/java/com/linkedin/venice/controller/supersetschema/DefaultSupersetSchemaGenerator.java +++ b/services/venice-controller/src/main/java/com/linkedin/venice/controller/supersetschema/DefaultSupersetSchemaGenerator.java @@ -20,6 +20,6 @@ public boolean compareSchema(Schema s1, Schema s2) { @Override public Schema generateSupersetSchema(Schema existingSchema, Schema newSchema) { - return AvroSupersetSchemaUtils.generateSuperSetSchema(existingSchema, newSchema); + return AvroSupersetSchemaUtils.generateSupersetSchema(existingSchema, newSchema); } } diff --git a/services/venice-controller/src/main/java/com/linkedin/venice/controller/supersetschema/SupersetSchemaGeneratorWithCustomProp.java b/services/venice-controller/src/main/java/com/linkedin/venice/controller/supersetschema/SupersetSchemaGeneratorWithCustomProp.java index 92519628b5..f29fcd612c 100644 --- a/services/venice-controller/src/main/java/com/linkedin/venice/controller/supersetschema/SupersetSchemaGeneratorWithCustomProp.java +++ b/services/venice-controller/src/main/java/com/linkedin/venice/controller/supersetschema/SupersetSchemaGeneratorWithCustomProp.java @@ -76,7 +76,7 @@ public boolean compareSchema(Schema s1, Schema s2) { @Override public Schema generateSupersetSchema(Schema existingSchema, Schema newSchema) { - Schema supersetSchema = AvroSupersetSchemaUtils.generateSuperSetSchema(existingSchema, newSchema); + Schema supersetSchema = AvroSupersetSchemaUtils.generateSupersetSchema(existingSchema, newSchema); String customPropInNewSchema = newSchema.getProp(customProp); if (customPropInNewSchema != null && supersetSchema.getProp(customProp) == null) { Schema newSupersetSchema = AvroSchemaParseUtils.parseSchemaFromJSONLooseValidation(supersetSchema.toString()); diff --git a/services/venice-controller/src/main/java/com/linkedin/venice/controller/systemstore/SystemStoreRepairTask.java b/services/venice-controller/src/main/java/com/linkedin/venice/controller/systemstore/SystemStoreRepairTask.java index dff7e87124..0524106839 100644 --- a/services/venice-controller/src/main/java/com/linkedin/venice/controller/systemstore/SystemStoreRepairTask.java +++ b/services/venice-controller/src/main/java/com/linkedin/venice/controller/systemstore/SystemStoreRepairTask.java @@ -224,7 +224,6 @@ void checkHeartbeatFromSystemStores( } long retrievedHeartbeatTimestamp = getHeartbeatFromSystemStore(clusterName, entry.getKey()); - LOGGER.info("DEBUGGING: {} {} {}", entry.getKey(), entry.getValue(), retrievedHeartbeatTimestamp); if (retrievedHeartbeatTimestamp < entry.getValue()) { newUnhealthySystemStoreSet.add(entry.getKey()); if (retrievedHeartbeatTimestamp == -1) {