Skip to content

Commit

Permalink
tests
Browse files Browse the repository at this point in the history
  • Loading branch information
gaojieliu committed Sep 29, 2024
1 parent 31a1df5 commit d99a2a6
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -204,8 +204,8 @@ synchronized void tryStartHeartbeat() {
protected static void sendOutHeartbeat(DaVinciBackend backend, Version version) {
if (backend.hasCurrentVersionBootstrapping()) {
LOGGER.info(
"DaVinci still is still bootstrapping, so it will send heart-beat deletion message "
+ "for store: {} to avoid delaying the new push job",
"DaVinci still is still bootstrapping, so it will send heart-beat message with a special timestamp"
+ " for store: {} to avoid delaying the new push job",
version.getStoreName());
/**
* Tell backend that the report from the bootstrapping instance doesn't count to avoid
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,6 @@ public static ExecutionStatusWithDetails getDaVinciPushStatusAndDetails(
Optional<String> erroredInstance = Optional.empty();
Set<String> offlineInstanceList = new HashSet<>();
Set<String> incompleteInstanceList = new HashSet<>();
Set<String> bootstrappingInstanceList = new HashSet<>();
ExecutionStatus errorStatus = ExecutionStatus.ERROR;
for (Map.Entry<CharSequence, Integer> entry: instances.entrySet()) {
ExecutionStatus status = ExecutionStatus.valueOf(entry.getValue());
Expand All @@ -100,9 +99,10 @@ public static ExecutionStatusWithDetails getDaVinciPushStatusAndDetails(
PushStatusStoreReader.InstanceStatus instanceStatus =
reader.getInstanceStatus(storeName, entry.getKey().toString());
if (instanceStatus.equals(PushStatusStoreReader.InstanceStatus.BOOTSTRAPPING)) {
if (bootstrappingInstanceList.size() < 5) {
bootstrappingInstanceList.add(entry.getKey().toString());
}
LOGGER.info(
"Skipping ingestion status report from bootstrapping instance: {} for topic: {}",
entry.getKey().toString(),
topicName);
continue;
}
if (instanceStatus.equals(PushStatusStoreReader.InstanceStatus.DEAD)) {
Expand Down Expand Up @@ -141,8 +141,7 @@ public static ExecutionStatusWithDetails getDaVinciPushStatusAndDetails(
? ExecutionStatus.DVC_INGESTION_ERROR_TOO_MANY_DEAD_INSTANCES
: ExecutionStatus.ERROR,
"Too many dead instances: " + offlineInstanceCount + ", total instances: " + totalInstanceCount
+ ", example offline instances: " + offlineInstanceList + ", example bootstrapping instances: "
+ bootstrappingInstanceList,
+ ", example offline instances: " + offlineInstanceList,
noDaVinciStatusReported);
}
} else {
Expand Down Expand Up @@ -240,7 +239,6 @@ public static ExecutionStatusWithDetails getDaVinciPartitionLevelPushStatusAndDe
int completedReplicaCount = 0;
Set<String> offlineInstanceList = new HashSet<>();
Set<Integer> incompletePartition = new HashSet<>();
Set<String> bootstrappingInstanceList = new HashSet<>();
/**
* This cache is used to reduce the duplicate calls for liveness check as one host can host multiple partitions.
*/
Expand All @@ -262,12 +260,13 @@ public static ExecutionStatusWithDetails getDaVinciPartitionLevelPushStatusAndDe
PushStatusStoreReader.InstanceStatus instanceStatus = instanceLivenessCache
.computeIfAbsent(instanceName, ignored -> reader.getInstanceStatus(storeName, instanceName));
if (instanceStatus.equals(PushStatusStoreReader.InstanceStatus.BOOTSTRAPPING)) {
// Keep at most 5 bootstrapping instances for logging purpose.
if (bootstrappingInstanceList.size() < 5) {
bootstrappingInstanceList.add(entry.getKey().toString());
}
// Don't count bootstrapping instance status report.
totalReplicaCount--;
LOGGER.info(
"Skipping ingestion status report from bootstrapping node: {} for topic: {}, partition: {}",
entry.getKey().toString(),
topicName,
partitionId);
continue;
}
if (instanceStatus.equals(PushStatusStoreReader.InstanceStatus.DEAD)) {
Expand Down Expand Up @@ -313,8 +312,7 @@ public static ExecutionStatusWithDetails getDaVinciPartitionLevelPushStatusAndDe
? ExecutionStatus.DVC_INGESTION_ERROR_TOO_MANY_DEAD_INSTANCES
: ExecutionStatus.ERROR,
"Too many dead instances: " + offlineReplicaCount + ", total instances: " + totalReplicaCount
+ ", example offline instances: " + offlineInstanceList + " example bootstrapping instances: "
+ bootstrappingInstanceList,
+ ", example offline instances: " + offlineInstanceList,
noDaVinciStatusReported);
}
} else {
Expand Down Expand Up @@ -377,4 +375,9 @@ public static ExecutionStatusWithDetails getDaVinciPartitionLevelPushStatusAndDe
static void setDaVinciErrorInstanceWaitTime(int time) {
daVinciErrorInstanceWaitTime = time;
}

// For testing purpose
static void setDVCDeadInstanceTime(String topicName, long timestamp) {
storeVersionToDVCDeadInstanceTimeMap.put(topicName, timestamp);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,22 +21,32 @@ public class PushMonitorUtilsTest {
@Test
public void testCompleteStatusCanBeReportedWithOfflineInstancesBelowFailFastThreshold() {
PushMonitorUtils.setDaVinciErrorInstanceWaitTime(0);
PushMonitorUtils.setDVCDeadInstanceTime("store_v1", System.currentTimeMillis());
PushStatusStoreReader reader = mock(PushStatusStoreReader.class);
/**
* Instance a is offline and its push status is not completed.
* Instance b,c,d are online and their push status is completed.
* In this case, the overall DaVinci push status can be COMPLETED as long as 1 is below the fail fast threshold.
*/
doReturn(false).when(reader).isInstanceAlive(eq("store"), eq("a"));
doReturn(true).when(reader).isInstanceAlive(eq("store"), eq("b"));
doReturn(true).when(reader).isInstanceAlive(eq("store"), eq("c"));
doReturn(true).when(reader).isInstanceAlive(eq("store"), eq("d"));
doReturn(PushStatusStoreReader.InstanceStatus.DEAD).when(reader).getInstanceStatus(eq("store"), eq("a"));
doReturn(PushStatusStoreReader.InstanceStatus.ALIVE).when(reader).getInstanceStatus(eq("store"), eq("b"));
doReturn(PushStatusStoreReader.InstanceStatus.ALIVE).when(reader).getInstanceStatus(eq("store"), eq("c"));
doReturn(PushStatusStoreReader.InstanceStatus.ALIVE).when(reader).getInstanceStatus(eq("store"), eq("d"));
// Bootstrapping nodes should be ignored
doReturn(PushStatusStoreReader.InstanceStatus.BOOTSTRAPPING).when(reader).getInstanceStatus(eq("store"), eq("e"));
doReturn(PushStatusStoreReader.InstanceStatus.BOOTSTRAPPING).when(reader).getInstanceStatus(eq("store"), eq("f"));
doReturn(PushStatusStoreReader.InstanceStatus.BOOTSTRAPPING).when(reader).getInstanceStatus(eq("store"), eq("g"));
doReturn(PushStatusStoreReader.InstanceStatus.BOOTSTRAPPING).when(reader).getInstanceStatus(eq("store"), eq("h"));

Map<CharSequence, Integer> map = new HashMap<>();
map.put("a", 2);
map.put("b", 10);
map.put("c", 10);
map.put("d", 10);
map.put("e", 2);
map.put("f", 2);
map.put("g", 2);
map.put("h", 2);

// Test partition level key first
doReturn(null).when(reader).getVersionStatus("store", 1);
Expand All @@ -56,10 +66,10 @@ public void testCompleteStatusCanBeReportedWithOfflineInstancesBelowFailFastThre
public void testDaVinciPushStatusScan(boolean useDaVinciSpecificExecutionStatusForError) {
PushMonitorUtils.setDaVinciErrorInstanceWaitTime(0);
PushStatusStoreReader reader = mock(PushStatusStoreReader.class);
doReturn(true).when(reader).isInstanceAlive(eq("store"), eq("a"));
doReturn(false).when(reader).isInstanceAlive(eq("store"), eq("b"));
doReturn(false).when(reader).isInstanceAlive(eq("store"), eq("c"));
doReturn(false).when(reader).isInstanceAlive(eq("store"), eq("d"));
doReturn(PushStatusStoreReader.InstanceStatus.ALIVE).when(reader).getInstanceStatus(eq("store"), eq("a"));
doReturn(PushStatusStoreReader.InstanceStatus.DEAD).when(reader).getInstanceStatus(eq("store"), eq("b"));
doReturn(PushStatusStoreReader.InstanceStatus.DEAD).when(reader).getInstanceStatus(eq("store"), eq("c"));
doReturn(PushStatusStoreReader.InstanceStatus.DEAD).when(reader).getInstanceStatus(eq("store"), eq("d"));

Map<CharSequence, Integer> map = new HashMap<>();
map.put("a", 3);
Expand Down

0 comments on commit d99a2a6

Please sign in to comment.