Skip to content

Commit

Permalink
[common][server][dvc-client] Make a true non-blocking end-offset fetc…
Browse files Browse the repository at this point in the history
…her (#1170)

In recent past we have seen cases where end offset fetch call stalls for a very long time. It could happen due to multiple reasons, like general Kafka outage, or mismatched VT vs RT partition count leading to non-existed partition or other network issues.
Even though we cache the offset value, there is a blocking call before cache is updated. This hold the lock for a very long time and subsequent calls (either from metics collection) or the ready to serve call waits forever. Since drainer thread is shared this blocks the processing of other resources in a drainer thread leading to cluster-wide impact. This PR check for cache miss and returns immediately with some sentinel value while a nonblocking call updates the cache asynchronously.
---------

Co-authored-by: Sourav Maji <tester@linkedin.com>
  • Loading branch information
majisourav99 and Sourav Maji authored Sep 20, 2024
1 parent bd2de8e commit 262101c
Show file tree
Hide file tree
Showing 5 changed files with 52 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@
import com.linkedin.venice.utils.LatencyUtils;
import com.linkedin.venice.utils.PartitionUtils;
import com.linkedin.venice.utils.RedundantExceptionFilter;
import com.linkedin.venice.utils.RetryUtils;
import com.linkedin.venice.utils.SparseConcurrentList;
import com.linkedin.venice.utils.Time;
import com.linkedin.venice.utils.Timer;
Expand Down Expand Up @@ -2219,7 +2220,18 @@ protected long getTopicPartitionEndOffSet(String kafkaUrl, PubSubTopic pubSubTop
if (offsetFromConsumer >= 0) {
return offsetFromConsumer;
}
return getTopicManager(kafkaUrl).getLatestOffsetCached(pubSubTopic, partition);
try {
return RetryUtils.executeWithMaxAttempt(() -> {
long offset = getTopicManager(kafkaUrl).getLatestOffsetCachedNonBlocking(pubSubTopic, partition);
if (offset == -1) {
throw new VeniceException("Found latest offset -1");
}
return offset;
}, 5, Duration.ofSeconds(1), Collections.singletonList(VeniceException.class));
} catch (Exception e) {
LOGGER.error("Could not find latest offset for {} even after 5 retries", pubSubTopic.getName());
return -1;
}
}

protected long getPartitionOffsetLagBasedOnMetrics(String kafkaSourceAddress, PubSubTopic topic, int partition) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3017,6 +3017,9 @@ public void testIsReadyToServe(NodeType nodeType, AAConfig aaConfig, DataReplica
doReturn(5L).when(mockTopicManager).getLatestOffsetCached(any(), anyInt());
doReturn(150L).when(mockTopicManagerRemoteKafka).getLatestOffsetCached(any(), anyInt());
doReturn(150L).when(aggKafkaConsumerService).getLatestOffsetBasedOnMetrics(anyString(), any(), any());
long endOffset =
storeIngestionTaskUnderTest.getTopicPartitionEndOffSet(localKafkaConsumerService.kafkaUrl, pubSubTopic, 1);
assertEquals(endOffset, 150L);
if (nodeType == NodeType.LEADER) {
// case 6a: leader replica => partition is not ready to serve
doReturn(LeaderFollowerStateType.LEADER).when(mockPcsBufferReplayStartedRemoteLagging).getLeaderFollowerState();
Expand Down Expand Up @@ -3176,6 +3179,15 @@ public void testActiveActiveStoreIsReadyToServe(HybridConfig hybridConfig, NodeT
} else {
assertTrue(storeIngestionTaskUnderTest.isReadyToServe(mockPcsMultipleSourceKafkaServers));
}
doReturn(10L).when(aggKafkaConsumerService).getLatestOffsetBasedOnMetrics(anyString(), any(), any());
long endOffset =
storeIngestionTaskUnderTest.getTopicPartitionEndOffSet(localKafkaConsumerService.kafkaUrl, pubSubTopic, 0);
assertEquals(endOffset, 10L);
doReturn(-1L).when(aggKafkaConsumerService).getLatestOffsetBasedOnMetrics(anyString(), any(), any());
endOffset =
storeIngestionTaskUnderTest.getTopicPartitionEndOffSet(localKafkaConsumerService.kafkaUrl, pubSubTopic, 0);
assertEquals(endOffset, 0L);

}

@DataProvider
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -731,6 +731,11 @@ public long getLatestOffsetCached(PubSubTopic pubSubTopic, int partitionId) {
return topicMetadataFetcher.getLatestOffsetCached(new PubSubTopicPartitionImpl(pubSubTopic, partitionId));
}

public long getLatestOffsetCachedNonBlocking(PubSubTopic pubSubTopic, int partitionId) {
return topicMetadataFetcher
.getLatestOffsetCachedNonBlocking(new PubSubTopicPartitionImpl(pubSubTopic, partitionId));
}

public long getProducerTimestampOfLastDataMessageWithRetries(PubSubTopicPartition pubSubTopicPartition, int retries) {
return topicMetadataFetcher.getProducerTimestampOfLastDataMessageWithRetries(pubSubTopicPartition, retries);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -379,6 +379,25 @@ CompletableFuture<Long> getLatestOffsetWithRetriesAsync(PubSubTopicPartition pub
.supplyAsync(() -> getLatestOffsetWithRetries(pubSubTopicPartition, retries), threadPoolExecutor);
}

long getLatestOffsetCachedNonBlocking(PubSubTopicPartition pubSubTopicPartition) {
ValueAndExpiryTime<Long> cachedValue;
cachedValue = latestOffsetCache.get(pubSubTopicPartition);
updateCacheAsync(
pubSubTopicPartition,
cachedValue,
latestOffsetCache,
() -> getLatestOffsetWithRetriesAsync(
pubSubTopicPartition,
DEFAULT_MAX_RETRIES_FOR_POPULATING_TMD_CACHE_ENTRY));
if (cachedValue == null) {
cachedValue = latestOffsetCache.get(pubSubTopicPartition);
if (cachedValue == null) {
return -1;
}
}
return cachedValue.getValue();
}

long getLatestOffsetCached(PubSubTopicPartition pubSubTopicPartition) {
ValueAndExpiryTime<Long> cachedValue;
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,9 @@ public void testGetTopicLatestOffsets() {
assertEquals(res.size(), offsetsMap.size());
assertEquals(res.get(0), 111L);
assertEquals(res.get(1), 222L);
assertEquals(
topicMetadataFetcher.getLatestOffsetCachedNonBlocking(new PubSubTopicPartitionImpl(pubSubTopic, 0)),
-1);

verify(consumerMock, times(3)).partitionsFor(pubSubTopic);
verify(consumerMock, times(1)).endOffsets(eq(offsetsMap.keySet()), any(Duration.class));
Expand Down

0 comments on commit 262101c

Please sign in to comment.