diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/LeaderFollowerStoreIngestionTask.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/LeaderFollowerStoreIngestionTask.java index 3178419089..307c9aeceb 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/LeaderFollowerStoreIngestionTask.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/LeaderFollowerStoreIngestionTask.java @@ -2107,14 +2107,16 @@ protected void recordHeartbeatReceived( versionNumber, partitionConsumptionState.getPartition(), serverConfig.getKafkaClusterUrlToAliasMap().get(kafkaUrl), - consumerRecord.getValue().producerMetadata.messageTimestamp); + consumerRecord.getValue().producerMetadata.messageTimestamp, + partitionConsumptionState.isWaitingForReplicationLag()); } else { heartbeatMonitoringService.recordFollowerHeartbeat( storeName, versionNumber, partitionConsumptionState.getPartition(), serverConfig.getKafkaClusterUrlToAliasMap().get(kafkaUrl), - consumerRecord.getValue().producerMetadata.messageTimestamp); + consumerRecord.getValue().producerMetadata.messageTimestamp, + partitionConsumptionState.isWaitingForReplicationLag()); } } diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/stats/ingestion/heartbeat/HeartbeatMonitoringService.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/stats/ingestion/heartbeat/HeartbeatMonitoringService.java index dfe16d12be..7f23d10a9d 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/stats/ingestion/heartbeat/HeartbeatMonitoringService.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/stats/ingestion/heartbeat/HeartbeatMonitoringService.java @@ -9,6 +9,8 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.TimeUnit; +import org.apache.commons.lang3.tuple.MutablePair; +import org.apache.commons.lang3.tuple.Pair; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -41,8 +43,8 @@ public class HeartbeatMonitoringService extends AbstractVeniceService { private final String localRegionName; // store -> version -> partition -> region -> timestamp - private final Map>>> followerHeartbeatTimeStamps; - private final Map>>> leaderHeartbeatTimeStamps; + private final Map>>>> followerHeartbeatTimeStamps; + private final Map>>>> leaderHeartbeatTimeStamps; HeartbeatVersionedStats versionStatsReporter; public HeartbeatMonitoringService( @@ -65,31 +67,31 @@ public HeartbeatMonitoringService( } private synchronized void initializeEntry( - Map>>> heartbeatTimestamps, + Map>>>> heartbeatTimestamps, Version version, int partition, boolean isFollower) { - // We don't monitor heartbeat lag for non hybrid versions + // We don't monitor heartbeat lag for non-hybrid versions if (version.getHybridStoreConfig() == null) { return; } heartbeatTimestamps.computeIfAbsent(version.getStoreName(), storeKey -> new VeniceConcurrentHashMap<>()) .computeIfAbsent(version.getNumber(), versionKey -> new VeniceConcurrentHashMap<>()) .computeIfAbsent(partition, partitionKey -> { - Map regionTimestamps = new VeniceConcurrentHashMap<>(); + Map> regionTimestamps = new VeniceConcurrentHashMap<>(); if (version.isActiveActiveReplicationEnabled() && !isFollower) { for (String region: regionNames) { - regionTimestamps.put(region, System.currentTimeMillis()); + regionTimestamps.put(region, new MutablePair<>(System.currentTimeMillis(), false)); } } else { - regionTimestamps.put(localRegionName, System.currentTimeMillis()); + regionTimestamps.put(localRegionName, new MutablePair<>(System.currentTimeMillis(), false)); } return regionTimestamps; }); } private synchronized void removeEntry( - Map>>> heartbeatTimestamps, + Map>>>> heartbeatTimestamps, Version version, int partition) { heartbeatTimestamps.computeIfPresent(version.getStoreName(), (storeKey, versionMap) -> { @@ -155,9 +157,16 @@ public void stopInner() throws Exception { * @param partition the partition this heartbeat is for * @param region the region this heartbeat is from * @param timestamp the time of this heartbeat + * @param isReadyToServe has this partition been marked ready to serve? This determines how the metric is reported */ - public void recordLeaderHeartbeat(String store, int version, int partition, String region, Long timestamp) { - recordHeartbeat(store, version, partition, region, timestamp, leaderHeartbeatTimeStamps); + public void recordLeaderHeartbeat( + String store, + int version, + int partition, + String region, + Long timestamp, + boolean isReadyToServe) { + recordHeartbeat(store, version, partition, region, timestamp, leaderHeartbeatTimeStamps, isReadyToServe); } /** @@ -168,9 +177,16 @@ public void recordLeaderHeartbeat(String store, int version, int partition, Stri * @param partition the partition this heartbeat is for * @param region the region this heartbeat is from * @param timestamp the time of this heartbeat + * @param isReadyToServe has this partition been marked ready to serve? This determines how the metric is reported */ - public void recordFollowerHeartbeat(String store, int version, int partition, String region, Long timestamp) { - recordHeartbeat(store, version, partition, region, timestamp, followerHeartbeatTimeStamps); + public void recordFollowerHeartbeat( + String store, + int version, + int partition, + String region, + Long timestamp, + boolean isReadyToServe) { + recordHeartbeat(store, version, partition, region, timestamp, followerHeartbeatTimeStamps, isReadyToServe); } private void recordHeartbeat( @@ -179,12 +195,13 @@ private void recordHeartbeat( int partition, String region, Long timestamp, - Map>>> heartbeatTimestamps) { + Map>>>> heartbeatTimestamps, + boolean isReadyToServe) { if (region != null) { heartbeatTimestamps.computeIfPresent(store, (storeKey, perVersionMap) -> { perVersionMap.computeIfPresent(version, (versionKey, perPartitionMap) -> { perPartitionMap.computeIfPresent(partition, (partitionKey, perRegionMap) -> { - perRegionMap.put(region, timestamp); + perRegionMap.put(region, new MutablePair<>(timestamp, isReadyToServe)); return perRegionMap; }); return perPartitionMap; @@ -194,22 +211,29 @@ private void recordHeartbeat( } } - protected Map>>> getLeaderHeartbeatTimeStamps() { + protected Map>>>> getLeaderHeartbeatTimeStamps() { return leaderHeartbeatTimeStamps; } - protected Map>>> getFollowerHeartbeatTimeStamps() { + protected Map>>>> getFollowerHeartbeatTimeStamps() { return followerHeartbeatTimeStamps; } protected void recordLags( - Map>>> heartbeatTimestamps, + Map>>>> heartbeatTimestamps, ReportLagFunction lagFunction) { - for (Map.Entry>>> storeName: heartbeatTimestamps.entrySet()) { - for (Map.Entry>> version: storeName.getValue().entrySet()) { - for (Map.Entry> partition: version.getValue().entrySet()) { - for (Map.Entry region: partition.getValue().entrySet()) { - lagFunction.apply(storeName.getKey(), version.getKey(), region.getKey(), region.getValue()); + for (Map.Entry>>>> storeName: heartbeatTimestamps + .entrySet()) { + for (Map.Entry>>> version: storeName.getValue() + .entrySet()) { + for (Map.Entry>> partition: version.getValue().entrySet()) { + for (Map.Entry> region: partition.getValue().entrySet()) { + lagFunction.apply( + storeName.getKey(), + version.getKey(), + region.getKey(), + region.getValue().getLeft(), + region.getValue().getRight()); } } } @@ -217,20 +241,19 @@ protected void recordLags( } protected void record() { - recordLags( leaderHeartbeatTimeStamps, - ((storeName, version, region, heartbeatTs) -> versionStatsReporter + ((storeName, version, region, heartbeatTs, isReadyToServe) -> versionStatsReporter .recordLeaderLag(storeName, version, region, heartbeatTs))); recordLags( followerHeartbeatTimeStamps, - ((storeName, version, region, heartbeatTs) -> versionStatsReporter - .recordFollowerLag(storeName, version, region, heartbeatTs))); + ((storeName, version, region, heartbeatTs, isReadyToServe) -> versionStatsReporter + .recordFollowerLag(storeName, version, region, heartbeatTs, isReadyToServe))); } @FunctionalInterface interface ReportLagFunction { - void apply(String storeName, int version, String region, long lag); + void apply(String storeName, int version, String region, long lag, boolean isReadyToServe); } private class HeartbeatReporterThread extends Thread { diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/stats/ingestion/heartbeat/HeartbeatStat.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/stats/ingestion/heartbeat/HeartbeatStat.java index 5e36c2d333..fd2a628e3e 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/stats/ingestion/heartbeat/HeartbeatStat.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/stats/ingestion/heartbeat/HeartbeatStat.java @@ -9,8 +9,9 @@ public class HeartbeatStat { - Map leaderSensors = new VeniceConcurrentHashMap<>(); - Map followerSensors = new VeniceConcurrentHashMap<>(); + Map readyToServeLeaderSensors = new VeniceConcurrentHashMap<>(); + Map readyToServeFollowerSensors = new VeniceConcurrentHashMap<>(); + Map catchingUpFollowerSensors = new VeniceConcurrentHashMap<>(); WritePathLatencySensor defaultSensor; public HeartbeatStat(MetricConfig metricConfig, Set regions) { @@ -20,8 +21,12 @@ public HeartbeatStat(MetricConfig metricConfig, Set regions) { */ MetricsRepository localRepository = new MetricsRepository(metricConfig); for (String region: regions) { - leaderSensors.put(region, new WritePathLatencySensor(localRepository, metricConfig, "leader-" + region)); - followerSensors.put(region, new WritePathLatencySensor(localRepository, metricConfig, "follower-" + region)); + readyToServeLeaderSensors + .put(region, new WritePathLatencySensor(localRepository, metricConfig, "leader-" + region)); + readyToServeFollowerSensors + .put(region, new WritePathLatencySensor(localRepository, metricConfig, "follower-" + region)); + catchingUpFollowerSensors + .put(region, new WritePathLatencySensor(localRepository, metricConfig, "catching-up-follower-" + region)); } // This is an edge case return that should not happen, but it 'can' happen if a venice server is configured with no // local fabric in it's config. This currently isn't illegal, and probably wasn't made to be illegal so as to @@ -29,21 +34,30 @@ public HeartbeatStat(MetricConfig metricConfig, Set regions) { defaultSensor = new WritePathLatencySensor(localRepository, metricConfig, "default-"); } - public void recordLeaderLag(String region, long startTime) { + public void recordReadyToServeLeaderLag(String region, long startTime) { long endTime = System.currentTimeMillis(); - leaderSensors.computeIfAbsent(region, k -> defaultSensor).record(endTime - startTime, endTime); + readyToServeLeaderSensors.computeIfAbsent(region, k -> defaultSensor).record(endTime - startTime, endTime); } - public void recordFollowerLag(String region, long startTime) { + public void recordReadyToServeFollowerLag(String region, long startTime) { long endTime = System.currentTimeMillis(); - followerSensors.computeIfAbsent(region, k -> defaultSensor).record(endTime - startTime, endTime); + readyToServeFollowerSensors.computeIfAbsent(region, k -> defaultSensor).record(endTime - startTime, endTime); } - public WritePathLatencySensor getLeaderLag(String region) { - return leaderSensors.computeIfAbsent(region, k -> defaultSensor); + public void recordCatchingUpFollowerLag(String region, long startTime) { + long endTime = System.currentTimeMillis(); + catchingUpFollowerSensors.computeIfAbsent(region, k -> defaultSensor).record(endTime - startTime, endTime); + } + + public WritePathLatencySensor getReadyToServeLeaderLag(String region) { + return readyToServeLeaderSensors.computeIfAbsent(region, k -> defaultSensor); + } + + public WritePathLatencySensor getReadyToServeFollowerLag(String region) { + return readyToServeFollowerSensors.computeIfAbsent(region, k -> defaultSensor); } - public WritePathLatencySensor getFollowerLag(String region) { - return followerSensors.computeIfAbsent(region, k -> defaultSensor); + public WritePathLatencySensor getCatchingUpFollowerLag(String region) { + return catchingUpFollowerSensors.computeIfAbsent(region, k -> defaultSensor); } } diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/stats/ingestion/heartbeat/HeartbeatStatReporter.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/stats/ingestion/heartbeat/HeartbeatStatReporter.java index ba89b81e9e..42036c7875 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/stats/ingestion/heartbeat/HeartbeatStatReporter.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/stats/ingestion/heartbeat/HeartbeatStatReporter.java @@ -11,6 +11,7 @@ public class HeartbeatStatReporter extends AbstractVeniceStatsReporter { private static final String LEADER_METRIC_PREFIX = "heartbeat_delay_ms_leader-"; private static final String FOLLOWER_METRIC_PREFIX = "heartbeat_delay_ms_follower-"; + private static final String CATCHUP_UP_FOLLOWER_METRIC_PREFIX = "catching_up_heartbeat_delay_ms_follower-"; private static final String MAX = "-Max"; private static final String AVG = "-Avg"; @@ -21,7 +22,7 @@ public HeartbeatStatReporter(MetricsRepository metricsRepository, String storeNa if (getStats() == null) { return NULL_INGESTION_STATS.code; } - return getStats().getLeaderLag(region).getMax(); + return getStats().getReadyToServeLeaderLag(region).getMax(); }, LEADER_METRIC_PREFIX + region + MAX)); registerSensor(new AsyncGauge((ignored, ignored2) -> { @@ -29,7 +30,7 @@ public HeartbeatStatReporter(MetricsRepository metricsRepository, String storeNa return NULL_INGESTION_STATS.code; } - return getStats().getFollowerLag(region).getMax(); + return getStats().getReadyToServeFollowerLag(region).getMax(); }, FOLLOWER_METRIC_PREFIX + region + MAX)); registerSensor(new AsyncGauge((ignored, ignored2) -> { @@ -37,7 +38,7 @@ public HeartbeatStatReporter(MetricsRepository metricsRepository, String storeNa return NULL_INGESTION_STATS.code; } - return getStats().getLeaderLag(region).getAvg(); + return getStats().getReadyToServeLeaderLag(region).getAvg(); }, LEADER_METRIC_PREFIX + region + AVG)); registerSensor(new AsyncGauge((ignored, ignored2) -> { @@ -45,8 +46,24 @@ public HeartbeatStatReporter(MetricsRepository metricsRepository, String storeNa return NULL_INGESTION_STATS.code; } - return getStats().getFollowerLag(region).getAvg(); + return getStats().getReadyToServeFollowerLag(region).getAvg(); }, FOLLOWER_METRIC_PREFIX + region + AVG)); + + registerSensor(new AsyncGauge((ignored, ignored2) -> { + if (getStats() == null) { + return NULL_INGESTION_STATS.code; + } + + return getStats().getCatchingUpFollowerLag(region).getMax(); + }, CATCHUP_UP_FOLLOWER_METRIC_PREFIX + region + MAX)); + + registerSensor(new AsyncGauge((ignored, ignored2) -> { + if (getStats() == null) { + return NULL_INGESTION_STATS.code; + } + + return getStats().getCatchingUpFollowerLag(region).getAvg(); + }, CATCHUP_UP_FOLLOWER_METRIC_PREFIX + region + AVG)); } } diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/stats/ingestion/heartbeat/HeartbeatVersionedStats.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/stats/ingestion/heartbeat/HeartbeatVersionedStats.java index dd0fabbb3f..1e303f3e8a 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/stats/ingestion/heartbeat/HeartbeatVersionedStats.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/stats/ingestion/heartbeat/HeartbeatVersionedStats.java @@ -7,30 +7,46 @@ import io.tehuti.metrics.MetricsRepository; import java.util.Map; import java.util.function.Supplier; +import org.apache.commons.lang3.tuple.Pair; public class HeartbeatVersionedStats extends AbstractVeniceAggVersionedStats { - private final Map>>> leaderMonitors; - private final Map>>> followerMonitors; + private final Map>>>> leaderMonitors; + private final Map>>>> followerMonitors; public HeartbeatVersionedStats( MetricsRepository metricsRepository, ReadOnlyStoreRepository metadataRepository, Supplier statsInitiator, StatsSupplier reporterSupplier, - Map>>> leaderMonitors, - Map>>> followerMonitors) { + Map>>>> leaderMonitors, + Map>>>> followerMonitors) { super(metricsRepository, metadataRepository, statsInitiator, reporterSupplier, true); this.leaderMonitors = leaderMonitors; this.followerMonitors = followerMonitors; } public void recordLeaderLag(String storeName, int version, String region, long heartbeatTs) { - getStats(storeName, version).recordLeaderLag(region, heartbeatTs); + getStats(storeName, version).recordReadyToServeLeaderLag(region, heartbeatTs); } - public void recordFollowerLag(String storeName, int version, String region, long heartbeatTs) { - getStats(storeName, version).recordFollowerLag(region, heartbeatTs); + public void recordFollowerLag( + String storeName, + int version, + String region, + long heartbeatTs, + boolean isReadyToServe) { + // If the partition is ready to serve, report it's lage to the main lag metric. Otherwise, report it + // to the catch up metric. + // The metric which isn't updated is squelched by reporting the currentTime (so as to appear caught up and mute + // alerts) + if (isReadyToServe) { + getStats(storeName, version).recordReadyToServeFollowerLag(region, heartbeatTs); + getStats(storeName, version).recordCatchingUpFollowerLag(region, System.currentTimeMillis()); + } else { + getStats(storeName, version).recordReadyToServeFollowerLag(region, System.currentTimeMillis()); + getStats(storeName, version).recordCatchingUpFollowerLag(region, heartbeatTs); + } } @Override diff --git a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/stats/ingestion/heartbeat/HeartbeatMonitoringServiceTest.java b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/stats/ingestion/heartbeat/HeartbeatMonitoringServiceTest.java index 70c9d43690..59419920a9 100644 --- a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/stats/ingestion/heartbeat/HeartbeatMonitoringServiceTest.java +++ b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/stats/ingestion/heartbeat/HeartbeatMonitoringServiceTest.java @@ -30,7 +30,7 @@ public void testAddLeaderLagMonitor() { new HybridStoreConfigImpl(1L, 1L, 1L, DataReplicationPolicy.NON_AGGREGATE, BufferReplayPolicy.REWIND_FROM_SOP); // Version configs - Version backupVersion = new VersionImpl(TEST_STORE, 1, "1"); // Non hybrid version + Version backupVersion = new VersionImpl(TEST_STORE, 1, "1"); // Non-hybrid version Version currentVersion = new VersionImpl(TEST_STORE, 2, "2"); // hybrid version, active/active Version futureVersion = new VersionImpl(TEST_STORE, 3, "3"); // hybrid version, non AA currentVersion.setHybridStoreConfig(hybridStoreConfig); @@ -57,10 +57,10 @@ public void testAddLeaderLagMonitor() { new HeartbeatMonitoringService(mockMetricsRepository, mockReadOnlyRepository, regions, LOCAL_FABRIC); // Let's emit some heartbeats that don't exist in the registry yet - heartbeatMonitoringService.recordLeaderHeartbeat(TEST_STORE, 1, 0, LOCAL_FABRIC, 1000L); - heartbeatMonitoringService.recordFollowerHeartbeat(TEST_STORE, 2, 0, LOCAL_FABRIC, 1000L); + heartbeatMonitoringService.recordLeaderHeartbeat(TEST_STORE, 1, 0, LOCAL_FABRIC, 1000L, true); + heartbeatMonitoringService.recordFollowerHeartbeat(TEST_STORE, 2, 0, LOCAL_FABRIC, 1000L, true); // and throw a null at it too for good measure - heartbeatMonitoringService.recordFollowerHeartbeat(TEST_STORE, 2, 0, null, 1000L); + heartbeatMonitoringService.recordFollowerHeartbeat(TEST_STORE, 2, 0, null, 1000L, true); // Since we haven't gotten a signal to handle these heartbeats, we discard them. Assert.assertNull(heartbeatMonitoringService.getLeaderHeartbeatTimeStamps().get(TEST_STORE)); @@ -83,28 +83,28 @@ public void testAddLeaderLagMonitor() { // Follower heartbeats // local fabric heartbeats - heartbeatMonitoringService.recordFollowerHeartbeat(TEST_STORE, 1, 0, LOCAL_FABRIC, 1001L); - heartbeatMonitoringService.recordFollowerHeartbeat(TEST_STORE, 2, 0, LOCAL_FABRIC, 1001L); - heartbeatMonitoringService.recordFollowerHeartbeat(TEST_STORE, 3, 0, LOCAL_FABRIC, 1001L); - heartbeatMonitoringService.recordFollowerHeartbeat(TEST_STORE, 1, 1, LOCAL_FABRIC, 1001L); - heartbeatMonitoringService.recordFollowerHeartbeat(TEST_STORE, 2, 1, LOCAL_FABRIC, 1001L); - heartbeatMonitoringService.recordFollowerHeartbeat(TEST_STORE, 3, 1, LOCAL_FABRIC, 1001L); - heartbeatMonitoringService.recordFollowerHeartbeat(TEST_STORE, 1, 2, LOCAL_FABRIC, 1001L); - heartbeatMonitoringService.recordFollowerHeartbeat(TEST_STORE, 2, 2, LOCAL_FABRIC, 1001L); - heartbeatMonitoringService.recordFollowerHeartbeat(TEST_STORE, 3, 2, LOCAL_FABRIC, 1001L); + heartbeatMonitoringService.recordFollowerHeartbeat(TEST_STORE, 1, 0, LOCAL_FABRIC, 1001L, true); + heartbeatMonitoringService.recordFollowerHeartbeat(TEST_STORE, 2, 0, LOCAL_FABRIC, 1001L, true); + heartbeatMonitoringService.recordFollowerHeartbeat(TEST_STORE, 3, 0, LOCAL_FABRIC, 1001L, true); + heartbeatMonitoringService.recordFollowerHeartbeat(TEST_STORE, 1, 1, LOCAL_FABRIC, 1001L, true); + heartbeatMonitoringService.recordFollowerHeartbeat(TEST_STORE, 2, 1, LOCAL_FABRIC, 1001L, true); + heartbeatMonitoringService.recordFollowerHeartbeat(TEST_STORE, 3, 1, LOCAL_FABRIC, 1001L, true); + heartbeatMonitoringService.recordFollowerHeartbeat(TEST_STORE, 1, 2, LOCAL_FABRIC, 1001L, true); + heartbeatMonitoringService.recordFollowerHeartbeat(TEST_STORE, 2, 2, LOCAL_FABRIC, 1001L, true); + heartbeatMonitoringService.recordFollowerHeartbeat(TEST_STORE, 3, 2, LOCAL_FABRIC, 1001L, true); // remote fabric heartbeats - heartbeatMonitoringService.recordFollowerHeartbeat(TEST_STORE, 1, 0, REMOTE_FABRIC, 1001L); - heartbeatMonitoringService.recordFollowerHeartbeat(TEST_STORE, 2, 0, REMOTE_FABRIC, 1001L); - heartbeatMonitoringService.recordFollowerHeartbeat(TEST_STORE, 3, 0, REMOTE_FABRIC, 1001L); - heartbeatMonitoringService.recordFollowerHeartbeat(TEST_STORE, 1, 1, REMOTE_FABRIC, 1001L); - heartbeatMonitoringService.recordFollowerHeartbeat(TEST_STORE, 2, 1, REMOTE_FABRIC, 1001L); - heartbeatMonitoringService.recordFollowerHeartbeat(TEST_STORE, 3, 1, REMOTE_FABRIC, 1001L); - heartbeatMonitoringService.recordFollowerHeartbeat(TEST_STORE, 1, 3, REMOTE_FABRIC, 1001L); + heartbeatMonitoringService.recordFollowerHeartbeat(TEST_STORE, 1, 0, REMOTE_FABRIC, 1001L, true); + heartbeatMonitoringService.recordFollowerHeartbeat(TEST_STORE, 2, 0, REMOTE_FABRIC, 1001L, true); + heartbeatMonitoringService.recordFollowerHeartbeat(TEST_STORE, 3, 0, REMOTE_FABRIC, 1001L, true); + heartbeatMonitoringService.recordFollowerHeartbeat(TEST_STORE, 1, 1, REMOTE_FABRIC, 1001L, true); + heartbeatMonitoringService.recordFollowerHeartbeat(TEST_STORE, 2, 1, REMOTE_FABRIC, 1001L, true); + heartbeatMonitoringService.recordFollowerHeartbeat(TEST_STORE, 3, 1, REMOTE_FABRIC, 1001L, true); + heartbeatMonitoringService.recordFollowerHeartbeat(TEST_STORE, 1, 3, REMOTE_FABRIC, 1001L, true); // bogus heartbeats - heartbeatMonitoringService.recordLeaderHeartbeat(TEST_STORE, 2, 0, LOCAL_FABRIC, 1002L); - heartbeatMonitoringService.recordLeaderHeartbeat(TEST_STORE, 3, 0, LOCAL_FABRIC, 1002L); + heartbeatMonitoringService.recordLeaderHeartbeat(TEST_STORE, 2, 0, LOCAL_FABRIC, 1002L, true); + heartbeatMonitoringService.recordLeaderHeartbeat(TEST_STORE, 3, 0, LOCAL_FABRIC, 1002L, true); Assert.assertEquals(heartbeatMonitoringService.getFollowerHeartbeatTimeStamps().size(), 1); // We only expect two entries as version 1 is a non-hybrid version @@ -124,15 +124,17 @@ public void testAddLeaderLagMonitor() { .get(TEST_STORE) .get(futureVersion.getNumber()) .get(1) - .get(LOCAL_FABRIC); - Assert.assertTrue(value == 1001L); + .get(LOCAL_FABRIC) + .getLeft(); + Assert.assertEquals((long) value, 1001L); value = heartbeatMonitoringService.getFollowerHeartbeatTimeStamps() .get(TEST_STORE) .get(futureVersion.getNumber()) .get(1) - .get(REMOTE_FABRIC); - Assert.assertTrue(value == 1001L); + .get(REMOTE_FABRIC) + .getLeft(); + Assert.assertEquals((long) value, 1001L); // Leader state transitions heartbeatMonitoringService.addLeaderLagMonitor(currentVersion, 1); @@ -172,9 +174,9 @@ public void testAddLeaderLagMonitor() { Assert.assertNull( heartbeatMonitoringService.getLeaderHeartbeatTimeStamps().get(TEST_STORE).get(backupVersion.getNumber())); - heartbeatMonitoringService.recordFollowerHeartbeat(TEST_STORE, 1, 1, REMOTE_FABRIC, 1003L); - heartbeatMonitoringService.recordFollowerHeartbeat(TEST_STORE, 2, 1, REMOTE_FABRIC, 1003L); - heartbeatMonitoringService.recordFollowerHeartbeat(TEST_STORE, 3, 1, REMOTE_FABRIC, 1003L); + heartbeatMonitoringService.recordFollowerHeartbeat(TEST_STORE, 1, 1, REMOTE_FABRIC, 1003L, true); + heartbeatMonitoringService.recordFollowerHeartbeat(TEST_STORE, 2, 1, REMOTE_FABRIC, 1003L, true); + heartbeatMonitoringService.recordFollowerHeartbeat(TEST_STORE, 3, 1, REMOTE_FABRIC, 1003L, true); // make sure leaders are cleared out Assert.assertNull( @@ -204,14 +206,16 @@ public void testAddLeaderLagMonitor() { .get(TEST_STORE) .get(futureVersion.getNumber()) .get(1) - .get(REMOTE_FABRIC); - Assert.assertTrue(value == 1003L); + .get(REMOTE_FABRIC) + .getLeft(); + Assert.assertEquals((long) value, 1003L); value = heartbeatMonitoringService.getFollowerHeartbeatTimeStamps() .get(TEST_STORE) .get(currentVersion.getNumber()) .get(1) - .get(REMOTE_FABRIC); - Assert.assertTrue(value == 1003L); + .get(REMOTE_FABRIC) + .getLeft(); + Assert.assertEquals((long) value, 1003L); // Drop/Error some heartbeatMonitoringService.removeLagMonitor(currentVersion, 0); @@ -219,9 +223,12 @@ public void testAddLeaderLagMonitor() { heartbeatMonitoringService.removeLagMonitor(backupVersion, 2); // Send heartbeats to resources we just dropped - heartbeatMonitoringService.recordFollowerHeartbeat(TEST_STORE, backupVersion.getNumber(), 2, LOCAL_FABRIC, 1005L); - heartbeatMonitoringService.recordFollowerHeartbeat(TEST_STORE, currentVersion.getNumber(), 0, LOCAL_FABRIC, 1005L); - heartbeatMonitoringService.recordFollowerHeartbeat(TEST_STORE, futureVersion.getNumber(), 1, LOCAL_FABRIC, 1005L); + heartbeatMonitoringService + .recordFollowerHeartbeat(TEST_STORE, backupVersion.getNumber(), 2, LOCAL_FABRIC, 1005L, true); + heartbeatMonitoringService + .recordFollowerHeartbeat(TEST_STORE, currentVersion.getNumber(), 0, LOCAL_FABRIC, 1005L, true); + heartbeatMonitoringService + .recordFollowerHeartbeat(TEST_STORE, futureVersion.getNumber(), 1, LOCAL_FABRIC, 1005L, true); Assert.assertNull( heartbeatMonitoringService.getFollowerHeartbeatTimeStamps().get(TEST_STORE).get(backupVersion.getNumber()));