Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[da-vinci] Disabled status reporting when the DaVinci app is still bootstrapping #1203

Open
wants to merge 9 commits into
base: main
Choose a base branch
from
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
Expand Down Expand Up @@ -602,6 +603,15 @@ protected void reportPushStatus(
int partition,
ExecutionStatus status,
Optional<String> incrementalPushVersion) {
if (hasCurrentVersionBootstrapping()) {
LOGGER.info(
"DaVinci is still bootstrapping, so skip the status report for store version:{}, partition: {}, status: {}{}",
kafkaTopic,
partition,
status,
(incrementalPushVersion.isPresent() ? ", inc push version: " + incrementalPushVersion.get() : ""));
return;
}
VersionBackend versionBackend = versionByTopicMap.get(kafkaTopic);
if (versionBackend != null && versionBackend.isReportingPushStatus()) {
Version version = versionBackend.getVersion();
Expand Down Expand Up @@ -818,4 +828,30 @@ static ExecutionStatus getDaVinciErrorStatus(Exception e, boolean useDaVinciSpec
}
return status;
}

public boolean hasCurrentVersionBootstrapping() {
return ingestionService.hasCurrentVersionBootstrapping();
}

static class BootstrappingAwareCompletableFuture {
private ScheduledExecutorService scheduledExecutor =
Executors.newSingleThreadScheduledExecutor(new DaemonThreadFactory("DaVinci_Bootstrapping_Check_Executor"));
public final CompletableFuture<Void> bootstrappingFuture = new CompletableFuture<>();

public BootstrappingAwareCompletableFuture(DaVinciBackend backend) {
scheduledExecutor.scheduleAtFixedRate(() -> {
if (bootstrappingFuture.isDone()) {
return;
}
if (!backend.hasCurrentVersionBootstrapping()) {
bootstrappingFuture.complete(null);
}
}, 0, 3, TimeUnit.SECONDS);
bootstrappingFuture.whenComplete((ignored1, ignored2) -> scheduledExecutor.shutdown());
}

public CompletableFuture<Void> getBootstrappingFuture() {
return bootstrappingFuture;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -193,14 +193,33 @@ synchronized void tryStartHeartbeat() {
if (isReportingPushStatus() && heartbeat == null) {
heartbeat = backend.getExecutor().scheduleAtFixedRate(() -> {
try {
backend.getPushStatusStoreWriter().writeHeartbeat(version.getStoreName());
sendOutHeartbeat(backend, version);
} catch (Throwable t) {
LOGGER.error("Unable to send heartbeat for {}", this);
}
}, 0, heartbeatInterval, TimeUnit.SECONDS);
}
}

protected static void sendOutHeartbeat(DaVinciBackend backend, Version version) {
if (backend.hasCurrentVersionBootstrapping()) {
LOGGER.info(
"DaVinci still is still bootstrapping, so it will send heart-beat deletion message "
+ "for store: {} to avoid delaying the new push job",
version.getStoreName());
/**
* Delete the heartbeat for the current instance if it is still bootstrapping.
* The reason to have such logic is that the bootstrapping of current version of
* some other store can happen later than the current store.
* We need to delete heartbeat for current instance to make sure Controller
* doesn't accidentally treat this node as crashed, then fail the push job.
*/
backend.getPushStatusStoreWriter().deleteHeartbeat(version.getStoreName());
} else {
backend.getPushStatusStoreWriter().writeHeartbeat(version.getStoreName());
}
}

synchronized void tryStopHeartbeat() {
if (heartbeat != null && partitionFutures.values().stream().allMatch(CompletableFuture::isDone)) {
heartbeat.cancel(true);
Expand Down Expand Up @@ -359,9 +378,40 @@ synchronized CompletableFuture<Void> subscribe(ComplementSet<Integer> partitions
futures.add(partitionFutures.get(partition));
}

return CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).whenComplete((v, e) -> {
CompletableFuture<Void> bootstrappingAwareSubscriptionFuture = new CompletableFuture<>();

CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).whenComplete((v, e) -> {
storeBackendStats.recordSubscribeDuration(Duration.between(startTime, Instant.now()));
if (e != null) {
bootstrappingAwareSubscriptionFuture.completeExceptionally(e);
LOGGER.warn("Bootstrapping store: {}, version: {} failed", version.getStoreName(), version.getNumber(), e);
} else {
LOGGER.info("Bootstrapping store: {}, version: {} is completed", version.getStoreName(), version.getNumber());
/**
* It is important to start polling the bootstrapping status after the version ingestion is completed to
* make sure the bootstrapping status polling is valid (not doing polling without any past/active ingestion tasks).
*/
new DaVinciBackend.BootstrappingAwareCompletableFuture(backend).getBootstrappingFuture()
.whenComplete((ignored, ee) -> {
if (ee != null) {
bootstrappingAwareSubscriptionFuture.completeExceptionally(ee);
LOGGER.warn(
"Bootstrapping aware subscription to store: {}, version: {} failed",
version.getStoreName(),
version.getNumber(),
ee);
} else {
bootstrappingAwareSubscriptionFuture.complete(null);
LOGGER.info(
"Bootstrapping aware subscription to store: {}, version: {} is completed",
version.getStoreName(),
version.getNumber());
}
});
}
});

return bootstrappingAwareSubscriptionFuture;
}

synchronized void unsubscribe(ComplementSet<Integer> partitions) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -557,6 +557,20 @@ private static void shutdownExecutorService(ExecutorService executor, String nam
}
}

public boolean hasCurrentVersionBootstrapping() {
return hasCurrentVersionBootstrapping(topicNameToIngestionTaskMap);
}

public static boolean hasCurrentVersionBootstrapping(Map<String, StoreIngestionTask> ingestionTaskMap) {
for (Map.Entry<String, StoreIngestionTask> entry: ingestionTaskMap.entrySet()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have a question/concern: Does this work with ingestion isolation feature? Because essentially ingestion will start on forked process

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In theory, it should.
At the startup time, all the version ingestions will happen in the isolated process including current version until it is finished, and once it is finished and handed over to the main process, it won't matter as it is completed.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But I am still thinking the case -
Say there is a II DVC, now it is bootstrapping with one current version and a future version.
Regarding future version, it will still ingest in forked process until RTS is reached. But in the middle, it will report STARTED/END_OF_PUSH_RECEIVED and the above change in the reportPushStatus logic does block it from reporting. So ZK/system store will still register such status. And when controller is polling the job status, it will see it as a not-ready-replica and blocked on it?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually it will not block, but it may potentially lead to too many dead instances because you also delete HB for the host, and fail fast for the job? (but if these bootstrapping nodes are small enough, then we might be good.....)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

delete HB won't result in dead instances and it is used to avoid dead instances..

StoreIngestionTask task = entry.getValue();
if (task.isCurrentVersion() && !task.hasAllPartitionReportedCompleted()) {
return true;
}
}
return false;
}

/**
* Stops all the Kafka consumption tasks.
* Closes all the Kafka clients.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import com.linkedin.venice.utils.ComplementSet;
import com.linkedin.venice.utils.PropertyBuilder;
import com.linkedin.venice.utils.ReferenceCounted;
import com.linkedin.venice.utils.TestUtils;
import com.linkedin.venice.utils.Utils;
import com.linkedin.venice.utils.VeniceProperties;
import io.tehuti.Metric;
Expand Down Expand Up @@ -98,6 +99,7 @@ void setUp() {
when(backend.getVeniceCurrentVersion(anyString())).thenCallRealMethod();
when(backend.getIngestionBackend()).thenReturn(ingestionBackend);
when(backend.getCompressorFactory()).thenReturn(compressorFactory);
when(backend.hasCurrentVersionBootstrapping()).thenReturn(false);
doCallRealMethod().when(backend).handleStoreChanged(any());

store = new ZKStore(
Expand Down Expand Up @@ -146,7 +148,7 @@ void testSubscribeCurrentVersion() throws Exception {
CompletableFuture subscribeResult = storeBackend.subscribe(ComplementSet.of(partition));
TimeUnit.MILLISECONDS.sleep(v1SubscribeDurationMs);
versionMap.get(version1.kafkaTopicName()).completePartition(partition);
subscribeResult.get(0, TimeUnit.SECONDS);
subscribeResult.get(3, TimeUnit.SECONDS);
// Verify that subscribe selected the current version by default.
try (ReferenceCounted<VersionBackend> versionRef = storeBackend.getDaVinciCurrentVersion()) {
assertEquals(versionRef.get().getVersion().getNumber(), version1.getNumber());
Expand Down Expand Up @@ -220,7 +222,7 @@ void testSubscribeWithoutCurrentVersion() throws Exception {
// Expecting to subscribe to the latest version (version2).
CompletableFuture subscribeResult = storeBackend.subscribe(ComplementSet.of(partition));
versionMap.get(version2.kafkaTopicName()).completePartition(partition);
subscribeResult.get(0, TimeUnit.SECONDS);
subscribeResult.get(3, TimeUnit.SECONDS);
// Verify that subscribe selected the latest version as current.
try (ReferenceCounted<VersionBackend> versionRef = storeBackend.getDaVinciCurrentVersion()) {
assertEquals(versionRef.get().getVersion().getNumber(), version2.getNumber());
Expand All @@ -238,7 +240,7 @@ void testSubscribeBootstrapVersion() throws Exception {
// Expecting to subscribe to the specified version (version1), which is neither current nor latest.
CompletableFuture subscribeResult = storeBackend.subscribe(ComplementSet.of(partition), Optional.of(version1));
versionMap.get(version1.kafkaTopicName()).completePartition(partition);
subscribeResult.get(0, TimeUnit.SECONDS);
subscribeResult.get(3, TimeUnit.SECONDS);
// Verify that subscribe selected the specified version as current.
try (ReferenceCounted<VersionBackend> versionRef = storeBackend.getDaVinciCurrentVersion()) {
assertEquals(versionRef.get().getVersion().getNumber(), version1.getNumber());
Expand All @@ -247,9 +249,11 @@ void testSubscribeBootstrapVersion() throws Exception {
// Simulate future version ingestion is complete.
versionMap.get(version2.kafkaTopicName()).completePartition(partition);
// Verify that future version became current once ingestion is complete.
try (ReferenceCounted<VersionBackend> versionRef = storeBackend.getDaVinciCurrentVersion()) {
assertEquals(versionRef.get().getVersion().getNumber(), version2.getNumber());
}
TestUtils.waitForNonDeterministicAssertion(3, TimeUnit.SECONDS, () -> {
try (ReferenceCounted<VersionBackend> versionRef = storeBackend.getDaVinciCurrentVersion()) {
assertEquals(versionRef.get().getVersion().getNumber(), version2.getNumber());
}
});
}

@Test
Expand All @@ -258,7 +262,7 @@ void testFutureVersionFailure() throws Exception {
// Expecting to subscribe to version1 and that version2 is a future version.
CompletableFuture subscribeResult = storeBackend.subscribe(ComplementSet.of(partition));
versionMap.get(version1.kafkaTopicName()).completePartition(partition);
subscribeResult.get(0, TimeUnit.SECONDS);
subscribeResult.get(3, TimeUnit.SECONDS);
assertTrue(versionMap.containsKey(version2.kafkaTopicName()));

// Simulate future version kill and removal from Venice.
Expand All @@ -280,23 +284,29 @@ void testFutureVersionFailure() throws Exception {

versionMap.get(version3.kafkaTopicName()).completePartitionExceptionally(partition, new Exception());
// Verify that neither of the bad versions became current.
try (ReferenceCounted<VersionBackend> versionRef = storeBackend.getDaVinciCurrentVersion()) {
assertEquals(versionRef.get().getVersion().getNumber(), version1.getNumber());
}
TestUtils.waitForNonDeterministicAssertion(3, TimeUnit.SECONDS, () -> {
try (ReferenceCounted<VersionBackend> versionRef = storeBackend.getDaVinciCurrentVersion()) {
assertEquals(versionRef.get().getVersion().getNumber(), version1.getNumber());
}
});

versionMap.get(version4.kafkaTopicName()).completePartition(partition);
// Verify that version 4 did not become current even if ingestion is complete.
try (ReferenceCounted<VersionBackend> versionRef = storeBackend.getDaVinciCurrentVersion()) {
assertEquals(versionRef.get().getVersion().getNumber(), version1.getNumber());
}
TestUtils.waitForNonDeterministicAssertion(3, TimeUnit.SECONDS, () -> {
try (ReferenceCounted<VersionBackend> versionRef = storeBackend.getDaVinciCurrentVersion()) {
assertEquals(versionRef.get().getVersion().getNumber(), version1.getNumber());
}
});
// Mark the version 4 as current.
store.setCurrentVersion(version4.getNumber());
backend.handleStoreChanged(storeBackend);

// Verify that successfully ingested version became current.
try (ReferenceCounted<VersionBackend> versionRef = storeBackend.getDaVinciCurrentVersion()) {
assertEquals(versionRef.get().getVersion().getNumber(), version4.getNumber());
}
TestUtils.waitForNonDeterministicAssertion(3, TimeUnit.SECONDS, () -> {
try (ReferenceCounted<VersionBackend> versionRef = storeBackend.getDaVinciCurrentVersion()) {
assertEquals(versionRef.get().getVersion().getNumber(), version4.getNumber());
}
});

// Simulate new version push and subsequent ingestion failure.
Version version5 = new VersionImpl(store.getName(), store.peekNextVersion().getNumber(), null, 30);
Expand All @@ -320,16 +330,18 @@ void testSubscribeUnsubscribe() throws Exception {
assertFalse(subscribeResult.isDone());
storeBackend.unsubscribe(ComplementSet.of(1));
// Verify that unsubscribe completed pending subscribe without failing it.
subscribeResult.get(0, TimeUnit.SECONDS);
try (ReferenceCounted<VersionBackend> versionRef = storeBackend.getDaVinciCurrentVersion()) {
assertTrue(versionRef.get().isPartitionReadyToServe(0));
assertFalse(versionRef.get().isPartitionReadyToServe(1));
}
subscribeResult.get(3, TimeUnit.SECONDS);
TestUtils.waitForNonDeterministicAssertion(3, TimeUnit.SECONDS, () -> {
try (ReferenceCounted<VersionBackend> versionRef = storeBackend.getDaVinciCurrentVersion()) {
assertTrue(versionRef.get().isPartitionReadyToServe(0));
assertFalse(versionRef.get().isPartitionReadyToServe(1));
}
});

// Simulate unsubscribe from all partitions while future version ingestion is pending.
subscribeResult = storeBackend.subscribe(ComplementSet.universalSet());
storeBackend.unsubscribe(ComplementSet.universalSet());
subscribeResult.get(0, TimeUnit.SECONDS);
subscribeResult.get(3, TimeUnit.SECONDS);
// Verify that all versions were deleted because subscription set became empty.
assertTrue(versionMap.isEmpty());
assertEquals(FileUtils.sizeOfDirectory(baseDataPath), 0);
Expand Down Expand Up @@ -377,24 +389,30 @@ void testRollbackAndRollForward() {
versionMap.get(version3.kafkaTopicName()).completePartition(partition);
store.setCurrentVersion(version3.getNumber());
backend.handleStoreChanged(storeBackend);
try (ReferenceCounted<VersionBackend> versionRef = storeBackend.getDaVinciCurrentVersion()) {
assertEquals(versionRef.get().getVersion().getNumber(), version3.getNumber());
}
TestUtils.waitForNonDeterministicAssertion(3, TimeUnit.SECONDS, () -> {
try (ReferenceCounted<VersionBackend> versionRef = storeBackend.getDaVinciCurrentVersion()) {
assertEquals(versionRef.get().getVersion().getNumber(), version3.getNumber());
}
});

// Rollback happens here, expecting Da Vinci to switch back to v1.
store.setCurrentVersion(1);
backend.handleStoreChanged(storeBackend);
versionMap.get(version1.kafkaTopicName()).completePartition(partition);
try (ReferenceCounted<VersionBackend> versionRef = storeBackend.getDaVinciCurrentVersion()) {
assertEquals(versionRef.get().getVersion().getNumber(), version1.getNumber());
}
TestUtils.waitForNonDeterministicAssertion(3, TimeUnit.SECONDS, () -> {
try (ReferenceCounted<VersionBackend> versionRef = storeBackend.getDaVinciCurrentVersion()) {
assertEquals(versionRef.get().getVersion().getNumber(), version1.getNumber());
}
});
versionMap.get(version2.kafkaTopicName()).completePartition(partition);

store.setCurrentVersion(3);
backend.handleStoreChanged(storeBackend);
versionMap.get(version3.kafkaTopicName()).completePartition(partition);
try (ReferenceCounted<VersionBackend> versionRef = storeBackend.getDaVinciCurrentVersion()) {
assertEquals(versionRef.get().getVersion().getNumber(), version3.getNumber());
}
TestUtils.waitForNonDeterministicAssertion(3, TimeUnit.SECONDS, () -> {
try (ReferenceCounted<VersionBackend> versionRef = storeBackend.getDaVinciCurrentVersion()) {
assertEquals(versionRef.get().getVersion().getNumber(), version3.getNumber());
}
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,14 @@
import static org.mockito.Mockito.doCallRealMethod;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;

import com.linkedin.venice.meta.Version;
import com.linkedin.venice.meta.VersionImpl;
import com.linkedin.venice.pushmonitor.ExecutionStatus;
import com.linkedin.venice.pushstatushelper.PushStatusStoreWriter;
import com.linkedin.venice.utils.concurrent.VeniceConcurrentHashMap;
import java.util.ArrayList;
import java.util.Collections;
Expand Down Expand Up @@ -89,4 +91,33 @@ public void testMaybeReportBatchEOIPStatus() {
verify(mockConsumer, times(1)).accept("inc_49");
Assert.assertFalse(partitionToBatchReportEOIPEnabled.get(0));
}

@Test
public void testSendOutHeartBeat() {
String storeName = "test_store";
DaVinciBackend backend = mock(DaVinciBackend.class);
doReturn(true).when(backend).hasCurrentVersionBootstrapping();
PushStatusStoreWriter mockWriter = mock(PushStatusStoreWriter.class);
doReturn(mockWriter).when(backend).getPushStatusStoreWriter();

Version currentVersion = mock(Version.class);
doReturn(storeName).when(currentVersion).getStoreName();
doReturn(1).when(currentVersion).getNumber();
Version futureVersion = mock(Version.class);
doReturn(storeName).when(futureVersion).getStoreName();
doReturn(2).when(futureVersion).getNumber();

VersionBackend.sendOutHeartbeat(backend, currentVersion);
VersionBackend.sendOutHeartbeat(backend, futureVersion);

verify(mockWriter, times(2)).deleteHeartbeat(storeName);

verify(mockWriter, never()).writeHeartbeat(storeName);

doReturn(false).when(backend).hasCurrentVersionBootstrapping();
VersionBackend.sendOutHeartbeat(backend, currentVersion);
VersionBackend.sendOutHeartbeat(backend, futureVersion);

verify(mockWriter, times(2)).writeHeartbeat(storeName);
}
}
Loading
Loading