diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/DaVinciBackend.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/DaVinciBackend.java index 8acc5d3535..577eb0f8d5 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/DaVinciBackend.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/DaVinciBackend.java @@ -79,6 +79,7 @@ import java.util.Objects; import java.util.Optional; import java.util.Set; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; @@ -818,4 +819,30 @@ static ExecutionStatus getDaVinciErrorStatus(Exception e, boolean useDaVinciSpec } return status; } + + public boolean hasCurrentVersionBootstrapping() { + return ingestionService.hasCurrentVersionBootstrapping(); + } + + static class BootstrappingAwareCompletableFuture { + private ScheduledExecutorService scheduledExecutor = + Executors.newSingleThreadScheduledExecutor(new DaemonThreadFactory("DaVinci_Bootstrapping_Check_Executor")); + public final CompletableFuture bootstrappingFuture = new CompletableFuture<>(); + + public BootstrappingAwareCompletableFuture(DaVinciBackend backend) { + scheduledExecutor.scheduleAtFixedRate(() -> { + if (bootstrappingFuture.isDone()) { + return; + } + if (!backend.hasCurrentVersionBootstrapping()) { + bootstrappingFuture.complete(null); + } + }, 0, 3, TimeUnit.SECONDS); + bootstrappingFuture.whenComplete((ignored1, ignored2) -> scheduledExecutor.shutdown()); + } + + public CompletableFuture getBootstrappingFuture() { + return bootstrappingFuture; + } + } } diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/VersionBackend.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/VersionBackend.java index e3b0cc59e8..b609f32b4f 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/VersionBackend.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/VersionBackend.java @@ -193,7 +193,7 @@ synchronized void tryStartHeartbeat() { if (isReportingPushStatus() && heartbeat == null) { heartbeat = backend.getExecutor().scheduleAtFixedRate(() -> { try { - backend.getPushStatusStoreWriter().writeHeartbeat(version.getStoreName()); + sendOutHeartbeat(backend, version); } catch (Throwable t) { LOGGER.error("Unable to send heartbeat for {}", this); } @@ -201,6 +201,22 @@ synchronized void tryStartHeartbeat() { } } + protected static void sendOutHeartbeat(DaVinciBackend backend, Version version) { + if (backend.hasCurrentVersionBootstrapping()) { + LOGGER.info( + "DaVinci still is still bootstrapping, so it will send heart-beat message with a special timestamp" + + " for store: {} to avoid delaying the new push job", + version.getStoreName()); + /** + * Tell backend that the report from the bootstrapping instance doesn't count to avoid + * delaying new pushes. + */ + backend.getPushStatusStoreWriter().writeHeartbeatForBootstrappingInstance(version.getStoreName()); + } else { + backend.getPushStatusStoreWriter().writeHeartbeat(version.getStoreName()); + } + } + synchronized void tryStopHeartbeat() { if (heartbeat != null && partitionFutures.values().stream().allMatch(CompletableFuture::isDone)) { heartbeat.cancel(true); @@ -359,9 +375,40 @@ synchronized CompletableFuture subscribe(ComplementSet partitions futures.add(partitionFutures.get(partition)); } - return CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).whenComplete((v, e) -> { + CompletableFuture bootstrappingAwareSubscriptionFuture = new CompletableFuture<>(); + + CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).whenComplete((v, e) -> { storeBackendStats.recordSubscribeDuration(Duration.between(startTime, Instant.now())); + if (e != null) { + bootstrappingAwareSubscriptionFuture.completeExceptionally(e); + LOGGER.warn("Bootstrapping store: {}, version: {} failed", version.getStoreName(), version.getNumber(), e); + } else { + LOGGER.info("Bootstrapping store: {}, version: {} is completed", version.getStoreName(), version.getNumber()); + /** + * It is important to start polling the bootstrapping status after the version ingestion is completed to + * make sure the bootstrapping status polling is valid (not doing polling without any past/active ingestion tasks). + */ + new DaVinciBackend.BootstrappingAwareCompletableFuture(backend).getBootstrappingFuture() + .whenComplete((ignored, ee) -> { + if (ee != null) { + bootstrappingAwareSubscriptionFuture.completeExceptionally(ee); + LOGGER.warn( + "Bootstrapping aware subscription to store: {}, version: {} failed", + version.getStoreName(), + version.getNumber(), + ee); + } else { + bootstrappingAwareSubscriptionFuture.complete(null); + LOGGER.info( + "Bootstrapping aware subscription to store: {}, version: {} is completed", + version.getStoreName(), + version.getNumber()); + } + }); + } }); + + return bootstrappingAwareSubscriptionFuture; } synchronized void unsubscribe(ComplementSet partitions) { diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/KafkaStoreIngestionService.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/KafkaStoreIngestionService.java index e073b20b72..ac4cbc0e65 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/KafkaStoreIngestionService.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/KafkaStoreIngestionService.java @@ -557,6 +557,20 @@ private static void shutdownExecutorService(ExecutorService executor, String nam } } + public boolean hasCurrentVersionBootstrapping() { + return hasCurrentVersionBootstrapping(topicNameToIngestionTaskMap); + } + + public static boolean hasCurrentVersionBootstrapping(Map ingestionTaskMap) { + for (Map.Entry entry: ingestionTaskMap.entrySet()) { + StoreIngestionTask task = entry.getValue(); + if (task.isCurrentVersion() && !task.hasAllPartitionReportedCompleted()) { + return true; + } + } + return false; + } + /** * Stops all the Kafka consumption tasks. * Closes all the Kafka clients. diff --git a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/DaVinciBackendTest.java b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/DaVinciBackendTest.java index 50b36b157a..2bab2878d8 100644 --- a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/DaVinciBackendTest.java +++ b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/DaVinciBackendTest.java @@ -4,6 +4,10 @@ import static com.linkedin.venice.pushmonitor.ExecutionStatus.ERROR; import static com.linkedin.venice.utils.DataProviderUtils.BOOLEAN; import static com.linkedin.venice.utils.DataProviderUtils.allPermutationGenerator; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; import static org.testng.Assert.assertEquals; import static org.testng.Assert.fail; @@ -11,6 +15,9 @@ import com.linkedin.venice.exceptions.MemoryLimitExhaustedException; import com.linkedin.venice.exceptions.VeniceException; import com.linkedin.venice.pushmonitor.ExecutionStatus; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import org.testng.annotations.DataProvider; import org.testng.annotations.Test; @@ -119,4 +126,17 @@ public void testGetDaVinciErrorStatusWithInvalidCases( } } + @Test + public void testBootstrappingAwareCompletableFuture() + throws ExecutionException, InterruptedException, TimeoutException { + DaVinciBackend backend = mock(DaVinciBackend.class); + + when(backend.hasCurrentVersionBootstrapping()).thenReturn(true).thenReturn(false); + + DaVinciBackend.BootstrappingAwareCompletableFuture future = + new DaVinciBackend.BootstrappingAwareCompletableFuture(backend); + future.getBootstrappingFuture().get(10, TimeUnit.SECONDS); + verify(backend, times(2)).hasCurrentVersionBootstrapping(); + } + } diff --git a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/StoreBackendTest.java b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/StoreBackendTest.java index b1f636edb2..e11dc14412 100644 --- a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/StoreBackendTest.java +++ b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/StoreBackendTest.java @@ -37,6 +37,7 @@ import com.linkedin.venice.utils.ComplementSet; import com.linkedin.venice.utils.PropertyBuilder; import com.linkedin.venice.utils.ReferenceCounted; +import com.linkedin.venice.utils.TestUtils; import com.linkedin.venice.utils.Utils; import com.linkedin.venice.utils.VeniceProperties; import io.tehuti.Metric; @@ -98,6 +99,7 @@ void setUp() { when(backend.getVeniceCurrentVersion(anyString())).thenCallRealMethod(); when(backend.getIngestionBackend()).thenReturn(ingestionBackend); when(backend.getCompressorFactory()).thenReturn(compressorFactory); + when(backend.hasCurrentVersionBootstrapping()).thenReturn(false); doCallRealMethod().when(backend).handleStoreChanged(any()); store = new ZKStore( @@ -146,7 +148,7 @@ void testSubscribeCurrentVersion() throws Exception { CompletableFuture subscribeResult = storeBackend.subscribe(ComplementSet.of(partition)); TimeUnit.MILLISECONDS.sleep(v1SubscribeDurationMs); versionMap.get(version1.kafkaTopicName()).completePartition(partition); - subscribeResult.get(0, TimeUnit.SECONDS); + subscribeResult.get(3, TimeUnit.SECONDS); // Verify that subscribe selected the current version by default. try (ReferenceCounted versionRef = storeBackend.getDaVinciCurrentVersion()) { assertEquals(versionRef.get().getVersion().getNumber(), version1.getNumber()); @@ -220,7 +222,7 @@ void testSubscribeWithoutCurrentVersion() throws Exception { // Expecting to subscribe to the latest version (version2). CompletableFuture subscribeResult = storeBackend.subscribe(ComplementSet.of(partition)); versionMap.get(version2.kafkaTopicName()).completePartition(partition); - subscribeResult.get(0, TimeUnit.SECONDS); + subscribeResult.get(3, TimeUnit.SECONDS); // Verify that subscribe selected the latest version as current. try (ReferenceCounted versionRef = storeBackend.getDaVinciCurrentVersion()) { assertEquals(versionRef.get().getVersion().getNumber(), version2.getNumber()); @@ -238,7 +240,7 @@ void testSubscribeBootstrapVersion() throws Exception { // Expecting to subscribe to the specified version (version1), which is neither current nor latest. CompletableFuture subscribeResult = storeBackend.subscribe(ComplementSet.of(partition), Optional.of(version1)); versionMap.get(version1.kafkaTopicName()).completePartition(partition); - subscribeResult.get(0, TimeUnit.SECONDS); + subscribeResult.get(3, TimeUnit.SECONDS); // Verify that subscribe selected the specified version as current. try (ReferenceCounted versionRef = storeBackend.getDaVinciCurrentVersion()) { assertEquals(versionRef.get().getVersion().getNumber(), version1.getNumber()); @@ -247,9 +249,11 @@ void testSubscribeBootstrapVersion() throws Exception { // Simulate future version ingestion is complete. versionMap.get(version2.kafkaTopicName()).completePartition(partition); // Verify that future version became current once ingestion is complete. - try (ReferenceCounted versionRef = storeBackend.getDaVinciCurrentVersion()) { - assertEquals(versionRef.get().getVersion().getNumber(), version2.getNumber()); - } + TestUtils.waitForNonDeterministicAssertion(3, TimeUnit.SECONDS, () -> { + try (ReferenceCounted versionRef = storeBackend.getDaVinciCurrentVersion()) { + assertEquals(versionRef.get().getVersion().getNumber(), version2.getNumber()); + } + }); } @Test @@ -258,7 +262,7 @@ void testFutureVersionFailure() throws Exception { // Expecting to subscribe to version1 and that version2 is a future version. CompletableFuture subscribeResult = storeBackend.subscribe(ComplementSet.of(partition)); versionMap.get(version1.kafkaTopicName()).completePartition(partition); - subscribeResult.get(0, TimeUnit.SECONDS); + subscribeResult.get(3, TimeUnit.SECONDS); assertTrue(versionMap.containsKey(version2.kafkaTopicName())); // Simulate future version kill and removal from Venice. @@ -280,23 +284,29 @@ void testFutureVersionFailure() throws Exception { versionMap.get(version3.kafkaTopicName()).completePartitionExceptionally(partition, new Exception()); // Verify that neither of the bad versions became current. - try (ReferenceCounted versionRef = storeBackend.getDaVinciCurrentVersion()) { - assertEquals(versionRef.get().getVersion().getNumber(), version1.getNumber()); - } + TestUtils.waitForNonDeterministicAssertion(3, TimeUnit.SECONDS, () -> { + try (ReferenceCounted versionRef = storeBackend.getDaVinciCurrentVersion()) { + assertEquals(versionRef.get().getVersion().getNumber(), version1.getNumber()); + } + }); versionMap.get(version4.kafkaTopicName()).completePartition(partition); // Verify that version 4 did not become current even if ingestion is complete. - try (ReferenceCounted versionRef = storeBackend.getDaVinciCurrentVersion()) { - assertEquals(versionRef.get().getVersion().getNumber(), version1.getNumber()); - } + TestUtils.waitForNonDeterministicAssertion(3, TimeUnit.SECONDS, () -> { + try (ReferenceCounted versionRef = storeBackend.getDaVinciCurrentVersion()) { + assertEquals(versionRef.get().getVersion().getNumber(), version1.getNumber()); + } + }); // Mark the version 4 as current. store.setCurrentVersion(version4.getNumber()); backend.handleStoreChanged(storeBackend); // Verify that successfully ingested version became current. - try (ReferenceCounted versionRef = storeBackend.getDaVinciCurrentVersion()) { - assertEquals(versionRef.get().getVersion().getNumber(), version4.getNumber()); - } + TestUtils.waitForNonDeterministicAssertion(3, TimeUnit.SECONDS, () -> { + try (ReferenceCounted versionRef = storeBackend.getDaVinciCurrentVersion()) { + assertEquals(versionRef.get().getVersion().getNumber(), version4.getNumber()); + } + }); // Simulate new version push and subsequent ingestion failure. Version version5 = new VersionImpl(store.getName(), store.peekNextVersion().getNumber(), null, 30); @@ -320,16 +330,18 @@ void testSubscribeUnsubscribe() throws Exception { assertFalse(subscribeResult.isDone()); storeBackend.unsubscribe(ComplementSet.of(1)); // Verify that unsubscribe completed pending subscribe without failing it. - subscribeResult.get(0, TimeUnit.SECONDS); - try (ReferenceCounted versionRef = storeBackend.getDaVinciCurrentVersion()) { - assertTrue(versionRef.get().isPartitionReadyToServe(0)); - assertFalse(versionRef.get().isPartitionReadyToServe(1)); - } + subscribeResult.get(3, TimeUnit.SECONDS); + TestUtils.waitForNonDeterministicAssertion(3, TimeUnit.SECONDS, () -> { + try (ReferenceCounted versionRef = storeBackend.getDaVinciCurrentVersion()) { + assertTrue(versionRef.get().isPartitionReadyToServe(0)); + assertFalse(versionRef.get().isPartitionReadyToServe(1)); + } + }); // Simulate unsubscribe from all partitions while future version ingestion is pending. subscribeResult = storeBackend.subscribe(ComplementSet.universalSet()); storeBackend.unsubscribe(ComplementSet.universalSet()); - subscribeResult.get(0, TimeUnit.SECONDS); + subscribeResult.get(3, TimeUnit.SECONDS); // Verify that all versions were deleted because subscription set became empty. assertTrue(versionMap.isEmpty()); assertEquals(FileUtils.sizeOfDirectory(baseDataPath), 0); @@ -377,24 +389,30 @@ void testRollbackAndRollForward() { versionMap.get(version3.kafkaTopicName()).completePartition(partition); store.setCurrentVersion(version3.getNumber()); backend.handleStoreChanged(storeBackend); - try (ReferenceCounted versionRef = storeBackend.getDaVinciCurrentVersion()) { - assertEquals(versionRef.get().getVersion().getNumber(), version3.getNumber()); - } + TestUtils.waitForNonDeterministicAssertion(3, TimeUnit.SECONDS, () -> { + try (ReferenceCounted versionRef = storeBackend.getDaVinciCurrentVersion()) { + assertEquals(versionRef.get().getVersion().getNumber(), version3.getNumber()); + } + }); // Rollback happens here, expecting Da Vinci to switch back to v1. store.setCurrentVersion(1); backend.handleStoreChanged(storeBackend); versionMap.get(version1.kafkaTopicName()).completePartition(partition); - try (ReferenceCounted versionRef = storeBackend.getDaVinciCurrentVersion()) { - assertEquals(versionRef.get().getVersion().getNumber(), version1.getNumber()); - } + TestUtils.waitForNonDeterministicAssertion(3, TimeUnit.SECONDS, () -> { + try (ReferenceCounted versionRef = storeBackend.getDaVinciCurrentVersion()) { + assertEquals(versionRef.get().getVersion().getNumber(), version1.getNumber()); + } + }); versionMap.get(version2.kafkaTopicName()).completePartition(partition); store.setCurrentVersion(3); backend.handleStoreChanged(storeBackend); versionMap.get(version3.kafkaTopicName()).completePartition(partition); - try (ReferenceCounted versionRef = storeBackend.getDaVinciCurrentVersion()) { - assertEquals(versionRef.get().getVersion().getNumber(), version3.getNumber()); - } + TestUtils.waitForNonDeterministicAssertion(3, TimeUnit.SECONDS, () -> { + try (ReferenceCounted versionRef = storeBackend.getDaVinciCurrentVersion()) { + assertEquals(versionRef.get().getVersion().getNumber(), version3.getNumber()); + } + }); } } diff --git a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/VersionBackendTest.java b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/VersionBackendTest.java index ef19d58989..0c22d61d94 100644 --- a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/VersionBackendTest.java +++ b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/VersionBackendTest.java @@ -6,12 +6,14 @@ import static org.mockito.Mockito.doCallRealMethod; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import com.linkedin.venice.meta.Version; import com.linkedin.venice.meta.VersionImpl; import com.linkedin.venice.pushmonitor.ExecutionStatus; +import com.linkedin.venice.pushstatushelper.PushStatusStoreWriter; import com.linkedin.venice.utils.concurrent.VeniceConcurrentHashMap; import java.util.ArrayList; import java.util.Collections; @@ -89,4 +91,32 @@ public void testMaybeReportBatchEOIPStatus() { verify(mockConsumer, times(1)).accept("inc_49"); Assert.assertFalse(partitionToBatchReportEOIPEnabled.get(0)); } + + @Test + public void testSendOutHeartBeat() { + String storeName = "test_store"; + DaVinciBackend backend = mock(DaVinciBackend.class); + doReturn(true).when(backend).hasCurrentVersionBootstrapping(); + PushStatusStoreWriter mockWriter = mock(PushStatusStoreWriter.class); + doReturn(mockWriter).when(backend).getPushStatusStoreWriter(); + + Version currentVersion = mock(Version.class); + doReturn(storeName).when(currentVersion).getStoreName(); + doReturn(1).when(currentVersion).getNumber(); + Version futureVersion = mock(Version.class); + doReturn(storeName).when(futureVersion).getStoreName(); + doReturn(2).when(futureVersion).getNumber(); + + VersionBackend.sendOutHeartbeat(backend, currentVersion); + VersionBackend.sendOutHeartbeat(backend, futureVersion); + + verify(mockWriter, times(2)).writeHeartbeatForBootstrappingInstance(storeName); + verify(mockWriter, never()).writeHeartbeat(storeName); + + doReturn(false).when(backend).hasCurrentVersionBootstrapping(); + VersionBackend.sendOutHeartbeat(backend, currentVersion); + VersionBackend.sendOutHeartbeat(backend, futureVersion); + + verify(mockWriter, times(2)).writeHeartbeat(storeName); + } } diff --git a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/KafkaStoreIngestionServiceTest.java b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/KafkaStoreIngestionServiceTest.java index b537fa8291..37c664f45b 100644 --- a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/KafkaStoreIngestionServiceTest.java +++ b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/KafkaStoreIngestionServiceTest.java @@ -12,6 +12,7 @@ import static org.mockito.Mockito.spy; import static org.mockito.Mockito.verify; import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertTrue; import com.linkedin.davinci.compression.StorageEngineBackedCompressorFactory; @@ -53,6 +54,8 @@ import io.tehuti.metrics.MetricsRepository; import it.unimi.dsi.fastutil.ints.Int2ObjectMaps; import it.unimi.dsi.fastutil.objects.Object2IntMaps; +import java.util.HashMap; +import java.util.Map; import java.util.NavigableMap; import java.util.Optional; import java.util.Properties; @@ -449,4 +452,41 @@ public void testStoreIngestionTaskShutdownLastPartition(boolean isIsolatedIngest kafkaStoreIngestionService.getTopicPartitionIngestionContext(topicName, topicName, 0); verify(kafkaConsumerService, atMostOnce()).getIngestionInfoFromConsumer(pubSubTopic, pubSubTopicPartition); } + + @Test + public void testHasCurrentVersionBootstrapping() { + StoreIngestionTask nonCurrentVersionBootstrappingTask = mock(StoreIngestionTask.class); + doReturn(false).when(nonCurrentVersionBootstrappingTask).isCurrentVersion(); + doReturn(false).when(nonCurrentVersionBootstrappingTask).hasAllPartitionReportedCompleted(); + + StoreIngestionTask nonCurrentVersionCompletedTask = mock(StoreIngestionTask.class); + doReturn(false).when(nonCurrentVersionCompletedTask).isCurrentVersion(); + doReturn(true).when(nonCurrentVersionCompletedTask).hasAllPartitionReportedCompleted(); + + StoreIngestionTask currentVersionBootstrappingTask = mock(StoreIngestionTask.class); + doReturn(true).when(currentVersionBootstrappingTask).isCurrentVersion(); + doReturn(false).when(currentVersionBootstrappingTask).hasAllPartitionReportedCompleted(); + + StoreIngestionTask currentVersionCompletedTask = mock(StoreIngestionTask.class); + doReturn(true).when(currentVersionCompletedTask).isCurrentVersion(); + doReturn(true).when(currentVersionCompletedTask).hasAllPartitionReportedCompleted(); + + Map mapContainsAllCompletedTask = new HashMap<>(); + mapContainsAllCompletedTask.put("non_current_version_completed", nonCurrentVersionCompletedTask); + mapContainsAllCompletedTask.put("current_version_completed", currentVersionCompletedTask); + + assertFalse(KafkaStoreIngestionService.hasCurrentVersionBootstrapping(mapContainsAllCompletedTask)); + + Map mapContainsNonCurrentBootstrappingTask = new HashMap<>(); + mapContainsNonCurrentBootstrappingTask.put("non_current_version_bootstrapping", nonCurrentVersionBootstrappingTask); + mapContainsNonCurrentBootstrappingTask.put("current_version_completed", currentVersionCompletedTask); + + assertFalse(KafkaStoreIngestionService.hasCurrentVersionBootstrapping(mapContainsNonCurrentBootstrappingTask)); + + Map mapContainsCurrentBootstrappingTask = new HashMap<>(); + mapContainsCurrentBootstrappingTask.put("non_current_version_bootstrapping", nonCurrentVersionBootstrappingTask); + mapContainsCurrentBootstrappingTask.put("current_version_bootstrapping", currentVersionBootstrappingTask); + + assertTrue(KafkaStoreIngestionService.hasCurrentVersionBootstrapping(mapContainsCurrentBootstrappingTask)); + } } diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/pushstatushelper/PushStatusStoreReader.java b/internal/venice-common/src/main/java/com/linkedin/venice/pushstatushelper/PushStatusStoreReader.java index 0070b222bf..73cefd9518 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/pushstatushelper/PushStatusStoreReader.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/pushstatushelper/PushStatusStoreReader.java @@ -34,6 +34,10 @@ * This class is a helper class for Venice controller to read PushStatus / Heartbeat messages. */ public class PushStatusStoreReader implements Closeable { + public enum InstanceStatus { + ALIVE, DEAD, BOOTSTRAPPING + } + private static final Logger LOGGER = LogManager.getLogger(PushStatusStoreReader.class); private static final int DEFAULT_HEARTBEAT_READ_TIMEOUT_SECONDS = 3; private final Map> veniceClients = @@ -226,11 +230,23 @@ public long getHeartbeat(String storeName, String instanceName) { } public boolean isInstanceAlive(String storeName, String instanceName) { - long lastReportTimeStamp = getHeartbeat(storeName, instanceName); + return isInstanceAlive(getHeartbeat(storeName, instanceName)); + } + + boolean isInstanceAlive(long lastReportTimeStamp) { return System.currentTimeMillis() - lastReportTimeStamp <= TimeUnit.SECONDS .toMillis(heartbeatExpirationTimeInSeconds); } + public InstanceStatus getInstanceStatus(String storeName, String instanceName) { + long lastReportTimeStamp = getHeartbeat(storeName, instanceName); + if (lastReportTimeStamp < 0) { + return InstanceStatus.BOOTSTRAPPING; + } + + return isInstanceAlive(lastReportTimeStamp) ? InstanceStatus.ALIVE : InstanceStatus.DEAD; + } + public Map getSupposedlyOngoingIncrementalPushVersions(String storeName, int storeVersion) { AvroSpecificStoreClient storeClient = getVeniceClient(storeName); PushStatusKey pushStatusKey = PushStatusStoreUtils.getOngoingIncrementalPushStatusesKey(storeVersion); diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/pushstatushelper/PushStatusStoreWriter.java b/internal/venice-common/src/main/java/com/linkedin/venice/pushstatushelper/PushStatusStoreWriter.java index 1e6c759743..9a8a2ce6b7 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/pushstatushelper/PushStatusStoreWriter.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/pushstatushelper/PushStatusStoreWriter.java @@ -67,6 +67,15 @@ public void writeHeartbeat(String storeName) { writeHeartbeat(storeName, System.currentTimeMillis()); } + /** + * This function will write `-1` to indicate the node is bootstrapping and Controller + * should ignore all the reports from this instance. + * @param storeName + */ + public void writeHeartbeatForBootstrappingInstance(String storeName) { + writeHeartbeat(storeName, -1); + } + public void writeHeartbeat(String storeName, long heartbeat) { VeniceWriter writer = veniceWriterCache.prepareVeniceWriter(storeName); PushStatusKey pushStatusKey = PushStatusStoreUtils.getHeartbeatKey(instanceName); diff --git a/internal/venice-common/src/test/java/com/linkedin/venice/pushstatushelper/PushStatusStoreReaderTest.java b/internal/venice-common/src/test/java/com/linkedin/venice/pushstatushelper/PushStatusStoreReaderTest.java index ebd133c10f..763fdc2b09 100644 --- a/internal/venice-common/src/test/java/com/linkedin/venice/pushstatushelper/PushStatusStoreReaderTest.java +++ b/internal/venice-common/src/test/java/com/linkedin/venice/pushstatushelper/PushStatusStoreReaderTest.java @@ -6,6 +6,7 @@ import static org.mockito.ArgumentMatchers.*; import static org.mockito.Mockito.any; import static org.mockito.Mockito.anySet; +import static org.mockito.Mockito.doCallRealMethod; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.eq; import static org.mockito.Mockito.mock; @@ -13,6 +14,7 @@ import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertEqualsDeep; import static org.testng.Assert.assertNotEquals; @@ -365,4 +367,22 @@ public void testNullResponseWhenVersionLevelKeyIsNotWritten() // Test that push status store reader will also return null instead of empty map in this case Assert.assertNull(storeReaderSpy.getVersionStatus(storeName, storeVersion)); } + + @Test + public void testGetInstanceStatus() { + PushStatusStoreReader mockReader = mock(PushStatusStoreReader.class); + doCallRealMethod().when(mockReader).getInstanceStatus(any(), any()); + + doReturn(-1l).when(mockReader).getHeartbeat("store_1", "instance_1"); + assertEquals( + mockReader.getInstanceStatus("store_1", "instance_1"), + PushStatusStoreReader.InstanceStatus.BOOTSTRAPPING); + + doReturn(1000l).when(mockReader).getHeartbeat("store_1", "instance_1"); + doReturn(true).when(mockReader).isInstanceAlive(anyLong()); + assertEquals(mockReader.getInstanceStatus("store_1", "instance_1"), PushStatusStoreReader.InstanceStatus.ALIVE); + + doReturn(false).when(mockReader).isInstanceAlive(anyLong()); + assertEquals(mockReader.getInstanceStatus("store_1", "instance_1"), PushStatusStoreReader.InstanceStatus.DEAD); + } } diff --git a/internal/venice-common/src/test/java/com/linkedin/venice/pushstatushelper/PushStatusStoreWriterTest.java b/internal/venice-common/src/test/java/com/linkedin/venice/pushstatushelper/PushStatusStoreWriterTest.java index 1a569166d5..d6c6bdf33e 100644 --- a/internal/venice-common/src/test/java/com/linkedin/venice/pushstatushelper/PushStatusStoreWriterTest.java +++ b/internal/venice-common/src/test/java/com/linkedin/venice/pushstatushelper/PushStatusStoreWriterTest.java @@ -144,4 +144,13 @@ public void testWriteHeartbeat() { verify(veniceWriterMock) .update(eq(statusKey), eq(getHeartbeatRecord(heartbeat)), eq(valueSchemaId), eq(derivedSchemaId), eq(null)); } + + @Test + public void testWriteHeartbeatForBootstrappingInstance() { + PushStatusKey statusKey = PushStatusStoreUtils.getHeartbeatKey(instanceName); + pushStatusStoreWriter.writeHeartbeatForBootstrappingInstance(storeName); + verify(veniceWriterCacheMock).prepareVeniceWriter(storeName); + verify(veniceWriterMock) + .update(eq(statusKey), eq(getHeartbeatRecord(-1)), eq(valueSchemaId), eq(derivedSchemaId), eq(null)); + } } diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/DaVinciClientTest.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/DaVinciClientTest.java index 40e6a543f3..45a393de71 100644 --- a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/DaVinciClientTest.java +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/DaVinciClientTest.java @@ -775,7 +775,7 @@ public void testHybridStore() throws Exception { // Isolated clients should not be able to unsubscribe partitions of other clients. client3.unsubscribeAll(); - client3.subscribe(Collections.singleton(partition)).get(0, TimeUnit.SECONDS); + client3.subscribe(Collections.singleton(partition)).get(10, TimeUnit.SECONDS); for (int i = 0; i < KEY_COUNT; i++) { final int key = i; // Both client2 & client4 are not subscribed to any partition. But client2 is not-isolated so it can diff --git a/services/venice-controller/src/main/java/com/linkedin/venice/pushmonitor/PushMonitorUtils.java b/services/venice-controller/src/main/java/com/linkedin/venice/pushmonitor/PushMonitorUtils.java index d66b4a69cd..0eb52dd8f2 100644 --- a/services/venice-controller/src/main/java/com/linkedin/venice/pushmonitor/PushMonitorUtils.java +++ b/services/venice-controller/src/main/java/com/linkedin/venice/pushmonitor/PushMonitorUtils.java @@ -89,6 +89,15 @@ public static ExecutionStatusWithDetails getDaVinciPushStatusAndDetails( Set incompleteInstanceList = new HashSet<>(); ExecutionStatus errorStatus = ExecutionStatus.ERROR; for (Map.Entry entry: instances.entrySet()) { + PushStatusStoreReader.InstanceStatus instanceStatus = + reader.getInstanceStatus(storeName, entry.getKey().toString()); + if (instanceStatus.equals(PushStatusStoreReader.InstanceStatus.BOOTSTRAPPING)) { + LOGGER.info( + "Skipping ingestion status report from bootstrapping instance: {} for topic: {}", + entry.getKey().toString(), + topicName); + continue; + } ExecutionStatus status = ExecutionStatus.valueOf(entry.getValue()); // We will skip completed instances, as they have stopped emitting heartbeats and will not be counted as live // instances. @@ -96,8 +105,7 @@ public static ExecutionStatusWithDetails getDaVinciPushStatusAndDetails( completedInstanceCount++; continue; } - boolean isInstanceAlive = reader.isInstanceAlive(storeName, entry.getKey().toString()); - if (!isInstanceAlive) { + if (instanceStatus.equals(PushStatusStoreReader.InstanceStatus.DEAD)) { offlineInstanceCount++; // Keep at most 5 offline instances for logging purpose. if (offlineInstanceList.size() < 5) { @@ -234,13 +242,27 @@ public static ExecutionStatusWithDetails getDaVinciPartitionLevelPushStatusAndDe /** * This cache is used to reduce the duplicate calls for liveness check as one host can host multiple partitions. */ - Map instanceLivenessCache = new HashMap<>(); + Map instanceLivenessCache = new HashMap<>(); for (int partitionId = 0; partitionId < partitionCount; partitionId++) { Map instances = reader.getPartitionStatus(storeName, version, partitionId, incrementalPushVersion); boolean allInstancesCompleted = true; totalReplicaCount += instances.size(); for (Map.Entry entry: instances.entrySet()) { + String instanceName = entry.getKey().toString(); + PushStatusStoreReader.InstanceStatus instanceStatus = instanceLivenessCache + .computeIfAbsent(instanceName, ignored -> reader.getInstanceStatus(storeName, instanceName)); + if (instanceStatus.equals(PushStatusStoreReader.InstanceStatus.BOOTSTRAPPING)) { + // Don't count bootstrapping instance status report. + totalReplicaCount--; + LOGGER.info( + "Skipping ingestion status report from bootstrapping node: {} for topic: {}, partition: {}", + entry.getKey().toString(), + topicName, + partitionId); + continue; + } + ExecutionStatus status = ExecutionStatus.valueOf(entry.getValue()); // We will skip completed replicas, as they have stopped emitting heartbeats and will not be counted as live // replicas. @@ -248,10 +270,7 @@ public static ExecutionStatusWithDetails getDaVinciPartitionLevelPushStatusAndDe completedReplicaCount++; continue; } - String instanceName = entry.getKey().toString(); - boolean isInstanceAlive = instanceLivenessCache - .computeIfAbsent(instanceName, ignored -> reader.isInstanceAlive(storeName, instanceName)); - if (!isInstanceAlive) { + if (instanceStatus.equals(PushStatusStoreReader.InstanceStatus.DEAD)) { // Keep at most 5 offline instances for logging purpose. if (offlineInstanceList.size() < 5) { offlineInstanceList.add(entry.getKey().toString()); @@ -357,4 +376,9 @@ public static ExecutionStatusWithDetails getDaVinciPartitionLevelPushStatusAndDe static void setDaVinciErrorInstanceWaitTime(int time) { daVinciErrorInstanceWaitTime = time; } + + // For testing purpose + static void setDVCDeadInstanceTime(String topicName, long timestamp) { + storeVersionToDVCDeadInstanceTimeMap.put(topicName, timestamp); + } } diff --git a/services/venice-controller/src/test/java/com/linkedin/venice/pushmonitor/PushMonitorUtilsTest.java b/services/venice-controller/src/test/java/com/linkedin/venice/pushmonitor/PushMonitorUtilsTest.java index 6d3df2d559..334dc51cf6 100644 --- a/services/venice-controller/src/test/java/com/linkedin/venice/pushmonitor/PushMonitorUtilsTest.java +++ b/services/venice-controller/src/test/java/com/linkedin/venice/pushmonitor/PushMonitorUtilsTest.java @@ -21,22 +21,32 @@ public class PushMonitorUtilsTest { @Test public void testCompleteStatusCanBeReportedWithOfflineInstancesBelowFailFastThreshold() { PushMonitorUtils.setDaVinciErrorInstanceWaitTime(0); + PushMonitorUtils.setDVCDeadInstanceTime("store_v1", System.currentTimeMillis()); PushStatusStoreReader reader = mock(PushStatusStoreReader.class); /** * Instance a is offline and its push status is not completed. * Instance b,c,d are online and their push status is completed. * In this case, the overall DaVinci push status can be COMPLETED as long as 1 is below the fail fast threshold. */ - doReturn(false).when(reader).isInstanceAlive(eq("store"), eq("a")); - doReturn(true).when(reader).isInstanceAlive(eq("store"), eq("b")); - doReturn(true).when(reader).isInstanceAlive(eq("store"), eq("c")); - doReturn(true).when(reader).isInstanceAlive(eq("store"), eq("d")); + doReturn(PushStatusStoreReader.InstanceStatus.DEAD).when(reader).getInstanceStatus(eq("store"), eq("a")); + doReturn(PushStatusStoreReader.InstanceStatus.ALIVE).when(reader).getInstanceStatus(eq("store"), eq("b")); + doReturn(PushStatusStoreReader.InstanceStatus.ALIVE).when(reader).getInstanceStatus(eq("store"), eq("c")); + doReturn(PushStatusStoreReader.InstanceStatus.ALIVE).when(reader).getInstanceStatus(eq("store"), eq("d")); + // Bootstrapping nodes should be ignored + doReturn(PushStatusStoreReader.InstanceStatus.BOOTSTRAPPING).when(reader).getInstanceStatus(eq("store"), eq("e")); + doReturn(PushStatusStoreReader.InstanceStatus.BOOTSTRAPPING).when(reader).getInstanceStatus(eq("store"), eq("f")); + doReturn(PushStatusStoreReader.InstanceStatus.BOOTSTRAPPING).when(reader).getInstanceStatus(eq("store"), eq("g")); + doReturn(PushStatusStoreReader.InstanceStatus.BOOTSTRAPPING).when(reader).getInstanceStatus(eq("store"), eq("h")); Map map = new HashMap<>(); map.put("a", 2); map.put("b", 10); map.put("c", 10); map.put("d", 10); + map.put("e", 2); + map.put("f", 2); + map.put("g", 2); + map.put("h", 2); // Test partition level key first doReturn(null).when(reader).getVersionStatus("store", 1); @@ -56,10 +66,10 @@ public void testCompleteStatusCanBeReportedWithOfflineInstancesBelowFailFastThre public void testDaVinciPushStatusScan(boolean useDaVinciSpecificExecutionStatusForError) { PushMonitorUtils.setDaVinciErrorInstanceWaitTime(0); PushStatusStoreReader reader = mock(PushStatusStoreReader.class); - doReturn(true).when(reader).isInstanceAlive(eq("store"), eq("a")); - doReturn(false).when(reader).isInstanceAlive(eq("store"), eq("b")); - doReturn(false).when(reader).isInstanceAlive(eq("store"), eq("c")); - doReturn(false).when(reader).isInstanceAlive(eq("store"), eq("d")); + doReturn(PushStatusStoreReader.InstanceStatus.ALIVE).when(reader).getInstanceStatus(eq("store"), eq("a")); + doReturn(PushStatusStoreReader.InstanceStatus.DEAD).when(reader).getInstanceStatus(eq("store"), eq("b")); + doReturn(PushStatusStoreReader.InstanceStatus.DEAD).when(reader).getInstanceStatus(eq("store"), eq("c")); + doReturn(PushStatusStoreReader.InstanceStatus.DEAD).when(reader).getInstanceStatus(eq("store"), eq("d")); Map map = new HashMap<>(); map.put("a", 3); diff --git a/services/venice-controller/src/test/java/com/linkedin/venice/pushmonitor/PushStatusCollectorTest.java b/services/venice-controller/src/test/java/com/linkedin/venice/pushmonitor/PushStatusCollectorTest.java index 1f3edb5be3..16bcb20a0e 100644 --- a/services/venice-controller/src/test/java/com/linkedin/venice/pushmonitor/PushStatusCollectorTest.java +++ b/services/venice-controller/src/test/java/com/linkedin/venice/pushmonitor/PushStatusCollectorTest.java @@ -109,7 +109,8 @@ public void testPushStatusCollector() { .thenReturn(startedInstancePushStatus, dvcTooManyDeadInstancesErrorInstancePushStatus); when(pushStatusStoreReader.getPartitionStatus(daVinciStoreName, 11, 0, Optional.empty())) .thenReturn(startedInstancePushStatus, dvcOtherErrorInstancePushStatus); - when(pushStatusStoreReader.isInstanceAlive(daVinciStoreName, "instance")).thenReturn(true); + when(pushStatusStoreReader.getInstanceStatus(daVinciStoreName, "instance")) + .thenReturn(PushStatusStoreReader.InstanceStatus.ALIVE); pushStatusCollector.subscribeTopic(daVinciStoreTopicV1, 1); Assert.assertFalse(pushStatusCollector.getTopicToPushStatusMap().containsKey(daVinciStoreTopicV1)); @@ -241,7 +242,8 @@ public void testPushStatusCollectorDaVinciStatusPollingRetry() { .thenReturn(Collections.emptyMap(), startedInstancePushStatus, dvcTooManyDeadInstancesErrorInstancePushStatus); when(pushStatusStoreReader.getPartitionStatus(daVinciStoreName, 6, 0, Optional.empty())) .thenReturn(Collections.emptyMap(), startedInstancePushStatus, dvcOtherErrorInstancePushStatus); - when(pushStatusStoreReader.isInstanceAlive(daVinciStoreName, "instance")).thenReturn(true); + when(pushStatusStoreReader.getInstanceStatus(daVinciStoreName, "instance")) + .thenReturn(PushStatusStoreReader.InstanceStatus.ALIVE); pushStatusCollector.subscribeTopic(daVinciStoreTopicV1, 1); Assert.assertFalse(pushStatusCollector.getTopicToPushStatusMap().containsKey(daVinciStoreTopicV1)); @@ -330,7 +332,8 @@ public void testPushStatusCollectorDaVinciStatusPollingRetryWhenEmptyResultUntil when(pushStatusStoreReader.getPartitionStatus(daVinciStoreName, 2, 0, Optional.empty())) .thenReturn(Collections.emptyMap()); - when(pushStatusStoreReader.isInstanceAlive(daVinciStoreName, "instance")).thenReturn(true); + when(pushStatusStoreReader.getInstanceStatus(daVinciStoreName, "instance")) + .thenReturn(PushStatusStoreReader.InstanceStatus.ALIVE); pushStatusCollector.subscribeTopic(daVinciStoreTopicV1, 1); Assert.assertFalse(pushStatusCollector.getTopicToPushStatusMap().containsKey(daVinciStoreTopicV1));