diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java index bf3e35fb22..75a8d6c454 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java @@ -102,6 +102,7 @@ import com.linkedin.venice.utils.LatencyUtils; import com.linkedin.venice.utils.PartitionUtils; import com.linkedin.venice.utils.RedundantExceptionFilter; +import com.linkedin.venice.utils.RetryUtils; import com.linkedin.venice.utils.SparseConcurrentList; import com.linkedin.venice.utils.Time; import com.linkedin.venice.utils.Timer; @@ -2219,7 +2220,18 @@ protected long getTopicPartitionEndOffSet(String kafkaUrl, PubSubTopic pubSubTop if (offsetFromConsumer >= 0) { return offsetFromConsumer; } - return getTopicManager(kafkaUrl).getLatestOffsetCached(pubSubTopic, partition); + try { + return RetryUtils.executeWithMaxAttempt(() -> { + long offset = getTopicManager(kafkaUrl).getLatestOffsetCachedNonBlocking(pubSubTopic, partition); + if (offset == -1) { + throw new VeniceException("Found latest offset -1"); + } + return offset; + }, 5, Duration.ofSeconds(1), Collections.singletonList(VeniceException.class)); + } catch (Exception e) { + LOGGER.error("Could not find latest offset for {} even after 5 retries", pubSubTopic.getName()); + return -1; + } } protected long getPartitionOffsetLagBasedOnMetrics(String kafkaSourceAddress, PubSubTopic topic, int partition) { diff --git a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTaskTest.java b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTaskTest.java index a5b5969a80..d02e59bd9c 100644 --- a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTaskTest.java +++ b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTaskTest.java @@ -3017,6 +3017,9 @@ public void testIsReadyToServe(NodeType nodeType, AAConfig aaConfig, DataReplica doReturn(5L).when(mockTopicManager).getLatestOffsetCached(any(), anyInt()); doReturn(150L).when(mockTopicManagerRemoteKafka).getLatestOffsetCached(any(), anyInt()); doReturn(150L).when(aggKafkaConsumerService).getLatestOffsetBasedOnMetrics(anyString(), any(), any()); + long endOffset = + storeIngestionTaskUnderTest.getTopicPartitionEndOffSet(localKafkaConsumerService.kafkaUrl, pubSubTopic, 1); + assertEquals(endOffset, 150L); if (nodeType == NodeType.LEADER) { // case 6a: leader replica => partition is not ready to serve doReturn(LeaderFollowerStateType.LEADER).when(mockPcsBufferReplayStartedRemoteLagging).getLeaderFollowerState(); @@ -3176,6 +3179,15 @@ public void testActiveActiveStoreIsReadyToServe(HybridConfig hybridConfig, NodeT } else { assertTrue(storeIngestionTaskUnderTest.isReadyToServe(mockPcsMultipleSourceKafkaServers)); } + doReturn(10L).when(aggKafkaConsumerService).getLatestOffsetBasedOnMetrics(anyString(), any(), any()); + long endOffset = + storeIngestionTaskUnderTest.getTopicPartitionEndOffSet(localKafkaConsumerService.kafkaUrl, pubSubTopic, 0); + assertEquals(endOffset, 10L); + doReturn(-1L).when(aggKafkaConsumerService).getLatestOffsetBasedOnMetrics(anyString(), any(), any()); + endOffset = + storeIngestionTaskUnderTest.getTopicPartitionEndOffSet(localKafkaConsumerService.kafkaUrl, pubSubTopic, 0); + assertEquals(endOffset, 0L); + } @DataProvider diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/pubsub/manager/TopicManager.java b/internal/venice-common/src/main/java/com/linkedin/venice/pubsub/manager/TopicManager.java index 1c3c0ed915..c91d3c452e 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/pubsub/manager/TopicManager.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/pubsub/manager/TopicManager.java @@ -731,6 +731,11 @@ public long getLatestOffsetCached(PubSubTopic pubSubTopic, int partitionId) { return topicMetadataFetcher.getLatestOffsetCached(new PubSubTopicPartitionImpl(pubSubTopic, partitionId)); } + public long getLatestOffsetCachedNonBlocking(PubSubTopic pubSubTopic, int partitionId) { + return topicMetadataFetcher + .getLatestOffsetCachedNonBlocking(new PubSubTopicPartitionImpl(pubSubTopic, partitionId)); + } + public long getProducerTimestampOfLastDataMessageWithRetries(PubSubTopicPartition pubSubTopicPartition, int retries) { return topicMetadataFetcher.getProducerTimestampOfLastDataMessageWithRetries(pubSubTopicPartition, retries); } diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/pubsub/manager/TopicMetadataFetcher.java b/internal/venice-common/src/main/java/com/linkedin/venice/pubsub/manager/TopicMetadataFetcher.java index 9c8e80f4ac..4a4e7ac85e 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/pubsub/manager/TopicMetadataFetcher.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/pubsub/manager/TopicMetadataFetcher.java @@ -379,6 +379,25 @@ CompletableFuture getLatestOffsetWithRetriesAsync(PubSubTopicPartition pub .supplyAsync(() -> getLatestOffsetWithRetries(pubSubTopicPartition, retries), threadPoolExecutor); } + long getLatestOffsetCachedNonBlocking(PubSubTopicPartition pubSubTopicPartition) { + ValueAndExpiryTime cachedValue; + cachedValue = latestOffsetCache.get(pubSubTopicPartition); + updateCacheAsync( + pubSubTopicPartition, + cachedValue, + latestOffsetCache, + () -> getLatestOffsetWithRetriesAsync( + pubSubTopicPartition, + DEFAULT_MAX_RETRIES_FOR_POPULATING_TMD_CACHE_ENTRY)); + if (cachedValue == null) { + cachedValue = latestOffsetCache.get(pubSubTopicPartition); + if (cachedValue == null) { + return -1; + } + } + return cachedValue.getValue(); + } + long getLatestOffsetCached(PubSubTopicPartition pubSubTopicPartition) { ValueAndExpiryTime cachedValue; try { diff --git a/internal/venice-common/src/test/java/com/linkedin/venice/pubsub/manager/TopicMetadataFetcherTest.java b/internal/venice-common/src/test/java/com/linkedin/venice/pubsub/manager/TopicMetadataFetcherTest.java index 214be223c5..bca7d15629 100644 --- a/internal/venice-common/src/test/java/com/linkedin/venice/pubsub/manager/TopicMetadataFetcherTest.java +++ b/internal/venice-common/src/test/java/com/linkedin/venice/pubsub/manager/TopicMetadataFetcherTest.java @@ -260,6 +260,9 @@ public void testGetTopicLatestOffsets() { assertEquals(res.size(), offsetsMap.size()); assertEquals(res.get(0), 111L); assertEquals(res.get(1), 222L); + assertEquals( + topicMetadataFetcher.getLatestOffsetCachedNonBlocking(new PubSubTopicPartitionImpl(pubSubTopic, 0)), + -1); verify(consumerMock, times(3)).partitionsFor(pubSubTopic); verify(consumerMock, times(1)).endOffsets(eq(offsetsMap.keySet()), any(Duration.class));