Skip to content

Commit

Permalink
[server][metrics] Add new metric which distinguishes a serving partit…
Browse files Browse the repository at this point in the history
…ion from a catching up one (#1142)

* [server][metrics] Add new metric which distinguishes a serving partition from a catching up one.

Additionally mutes the metric of whichever one isn't getting updated
  • Loading branch information
ZacAttack committed Sep 4, 2024
1 parent 8d220c5 commit 4fdaac5
Show file tree
Hide file tree
Showing 6 changed files with 167 additions and 88 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2107,14 +2107,16 @@ protected void recordHeartbeatReceived(
versionNumber,
partitionConsumptionState.getPartition(),
serverConfig.getKafkaClusterUrlToAliasMap().get(kafkaUrl),
consumerRecord.getValue().producerMetadata.messageTimestamp);
consumerRecord.getValue().producerMetadata.messageTimestamp,
partitionConsumptionState.isWaitingForReplicationLag());
} else {
heartbeatMonitoringService.recordFollowerHeartbeat(
storeName,
versionNumber,
partitionConsumptionState.getPartition(),
serverConfig.getKafkaClusterUrlToAliasMap().get(kafkaUrl),
consumerRecord.getValue().producerMetadata.messageTimestamp);
consumerRecord.getValue().producerMetadata.messageTimestamp,
partitionConsumptionState.isWaitingForReplicationLag());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.tuple.MutablePair;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

Expand Down Expand Up @@ -41,8 +43,8 @@ public class HeartbeatMonitoringService extends AbstractVeniceService {
private final String localRegionName;

// store -> version -> partition -> region -> timestamp
private final Map<String, Map<Integer, Map<Integer, Map<String, Long>>>> followerHeartbeatTimeStamps;
private final Map<String, Map<Integer, Map<Integer, Map<String, Long>>>> leaderHeartbeatTimeStamps;
private final Map<String, Map<Integer, Map<Integer, Map<String, Pair<Long, Boolean>>>>> followerHeartbeatTimeStamps;
private final Map<String, Map<Integer, Map<Integer, Map<String, Pair<Long, Boolean>>>>> leaderHeartbeatTimeStamps;
HeartbeatVersionedStats versionStatsReporter;

public HeartbeatMonitoringService(
Expand All @@ -65,31 +67,31 @@ public HeartbeatMonitoringService(
}

private synchronized void initializeEntry(
Map<String, Map<Integer, Map<Integer, Map<String, Long>>>> heartbeatTimestamps,
Map<String, Map<Integer, Map<Integer, Map<String, Pair<Long, Boolean>>>>> heartbeatTimestamps,
Version version,
int partition,
boolean isFollower) {
// We don't monitor heartbeat lag for non hybrid versions
// We don't monitor heartbeat lag for non-hybrid versions
if (version.getHybridStoreConfig() == null) {
return;
}
heartbeatTimestamps.computeIfAbsent(version.getStoreName(), storeKey -> new VeniceConcurrentHashMap<>())
.computeIfAbsent(version.getNumber(), versionKey -> new VeniceConcurrentHashMap<>())
.computeIfAbsent(partition, partitionKey -> {
Map<String, Long> regionTimestamps = new VeniceConcurrentHashMap<>();
Map<String, Pair<Long, Boolean>> regionTimestamps = new VeniceConcurrentHashMap<>();
if (version.isActiveActiveReplicationEnabled() && !isFollower) {
for (String region: regionNames) {
regionTimestamps.put(region, System.currentTimeMillis());
regionTimestamps.put(region, new MutablePair<>(System.currentTimeMillis(), false));
}
} else {
regionTimestamps.put(localRegionName, System.currentTimeMillis());
regionTimestamps.put(localRegionName, new MutablePair<>(System.currentTimeMillis(), false));
}
return regionTimestamps;
});
}

private synchronized void removeEntry(
Map<String, Map<Integer, Map<Integer, Map<String, Long>>>> heartbeatTimestamps,
Map<String, Map<Integer, Map<Integer, Map<String, Pair<Long, Boolean>>>>> heartbeatTimestamps,
Version version,
int partition) {
heartbeatTimestamps.computeIfPresent(version.getStoreName(), (storeKey, versionMap) -> {
Expand Down Expand Up @@ -155,9 +157,16 @@ public void stopInner() throws Exception {
* @param partition the partition this heartbeat is for
* @param region the region this heartbeat is from
* @param timestamp the time of this heartbeat
* @param isReadyToServe has this partition been marked ready to serve? This determines how the metric is reported
*/
public void recordLeaderHeartbeat(String store, int version, int partition, String region, Long timestamp) {
recordHeartbeat(store, version, partition, region, timestamp, leaderHeartbeatTimeStamps);
public void recordLeaderHeartbeat(
String store,
int version,
int partition,
String region,
Long timestamp,
boolean isReadyToServe) {
recordHeartbeat(store, version, partition, region, timestamp, leaderHeartbeatTimeStamps, isReadyToServe);
}

/**
Expand All @@ -168,9 +177,16 @@ public void recordLeaderHeartbeat(String store, int version, int partition, Stri
* @param partition the partition this heartbeat is for
* @param region the region this heartbeat is from
* @param timestamp the time of this heartbeat
* @param isReadyToServe has this partition been marked ready to serve? This determines how the metric is reported
*/
public void recordFollowerHeartbeat(String store, int version, int partition, String region, Long timestamp) {
recordHeartbeat(store, version, partition, region, timestamp, followerHeartbeatTimeStamps);
public void recordFollowerHeartbeat(
String store,
int version,
int partition,
String region,
Long timestamp,
boolean isReadyToServe) {
recordHeartbeat(store, version, partition, region, timestamp, followerHeartbeatTimeStamps, isReadyToServe);
}

private void recordHeartbeat(
Expand All @@ -179,12 +195,13 @@ private void recordHeartbeat(
int partition,
String region,
Long timestamp,
Map<String, Map<Integer, Map<Integer, Map<String, Long>>>> heartbeatTimestamps) {
Map<String, Map<Integer, Map<Integer, Map<String, Pair<Long, Boolean>>>>> heartbeatTimestamps,
boolean isReadyToServe) {
if (region != null) {
heartbeatTimestamps.computeIfPresent(store, (storeKey, perVersionMap) -> {
perVersionMap.computeIfPresent(version, (versionKey, perPartitionMap) -> {
perPartitionMap.computeIfPresent(partition, (partitionKey, perRegionMap) -> {
perRegionMap.put(region, timestamp);
perRegionMap.put(region, new MutablePair<>(timestamp, isReadyToServe));
return perRegionMap;
});
return perPartitionMap;
Expand All @@ -194,43 +211,49 @@ private void recordHeartbeat(
}
}

protected Map<String, Map<Integer, Map<Integer, Map<String, Long>>>> getLeaderHeartbeatTimeStamps() {
protected Map<String, Map<Integer, Map<Integer, Map<String, Pair<Long, Boolean>>>>> getLeaderHeartbeatTimeStamps() {
return leaderHeartbeatTimeStamps;
}

protected Map<String, Map<Integer, Map<Integer, Map<String, Long>>>> getFollowerHeartbeatTimeStamps() {
protected Map<String, Map<Integer, Map<Integer, Map<String, Pair<Long, Boolean>>>>> getFollowerHeartbeatTimeStamps() {
return followerHeartbeatTimeStamps;
}

protected void recordLags(
Map<String, Map<Integer, Map<Integer, Map<String, Long>>>> heartbeatTimestamps,
Map<String, Map<Integer, Map<Integer, Map<String, Pair<Long, Boolean>>>>> heartbeatTimestamps,
ReportLagFunction lagFunction) {
for (Map.Entry<String, Map<Integer, Map<Integer, Map<String, Long>>>> storeName: heartbeatTimestamps.entrySet()) {
for (Map.Entry<Integer, Map<Integer, Map<String, Long>>> version: storeName.getValue().entrySet()) {
for (Map.Entry<Integer, Map<String, Long>> partition: version.getValue().entrySet()) {
for (Map.Entry<String, Long> region: partition.getValue().entrySet()) {
lagFunction.apply(storeName.getKey(), version.getKey(), region.getKey(), region.getValue());
for (Map.Entry<String, Map<Integer, Map<Integer, Map<String, Pair<Long, Boolean>>>>> storeName: heartbeatTimestamps
.entrySet()) {
for (Map.Entry<Integer, Map<Integer, Map<String, Pair<Long, Boolean>>>> version: storeName.getValue()
.entrySet()) {
for (Map.Entry<Integer, Map<String, Pair<Long, Boolean>>> partition: version.getValue().entrySet()) {
for (Map.Entry<String, Pair<Long, Boolean>> region: partition.getValue().entrySet()) {
lagFunction.apply(
storeName.getKey(),
version.getKey(),
region.getKey(),
region.getValue().getLeft(),
region.getValue().getRight());
}
}
}
}
}

protected void record() {

recordLags(
leaderHeartbeatTimeStamps,
((storeName, version, region, heartbeatTs) -> versionStatsReporter
((storeName, version, region, heartbeatTs, isReadyToServe) -> versionStatsReporter
.recordLeaderLag(storeName, version, region, heartbeatTs)));
recordLags(
followerHeartbeatTimeStamps,
((storeName, version, region, heartbeatTs) -> versionStatsReporter
.recordFollowerLag(storeName, version, region, heartbeatTs)));
((storeName, version, region, heartbeatTs, isReadyToServe) -> versionStatsReporter
.recordFollowerLag(storeName, version, region, heartbeatTs, isReadyToServe)));
}

@FunctionalInterface
interface ReportLagFunction {
void apply(String storeName, int version, String region, long lag);
void apply(String storeName, int version, String region, long lag, boolean isReadyToServe);
}

private class HeartbeatReporterThread extends Thread {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,9 @@


public class HeartbeatStat {
Map<String, WritePathLatencySensor> leaderSensors = new VeniceConcurrentHashMap<>();
Map<String, WritePathLatencySensor> followerSensors = new VeniceConcurrentHashMap<>();
Map<String, WritePathLatencySensor> readyToServeLeaderSensors = new VeniceConcurrentHashMap<>();
Map<String, WritePathLatencySensor> readyToServeFollowerSensors = new VeniceConcurrentHashMap<>();
Map<String, WritePathLatencySensor> catchingUpFollowerSensors = new VeniceConcurrentHashMap<>();
WritePathLatencySensor defaultSensor;

public HeartbeatStat(MetricConfig metricConfig, Set<String> regions) {
Expand All @@ -20,30 +21,43 @@ public HeartbeatStat(MetricConfig metricConfig, Set<String> regions) {
*/
MetricsRepository localRepository = new MetricsRepository(metricConfig);
for (String region: regions) {
leaderSensors.put(region, new WritePathLatencySensor(localRepository, metricConfig, "leader-" + region));
followerSensors.put(region, new WritePathLatencySensor(localRepository, metricConfig, "follower-" + region));
readyToServeLeaderSensors
.put(region, new WritePathLatencySensor(localRepository, metricConfig, "leader-" + region));
readyToServeFollowerSensors
.put(region, new WritePathLatencySensor(localRepository, metricConfig, "follower-" + region));
catchingUpFollowerSensors
.put(region, new WritePathLatencySensor(localRepository, metricConfig, "catching-up-follower-" + region));
}
// This is an edge case return that should not happen, but it 'can' happen if a venice server is configured with no
// local fabric in it's config. This currently isn't illegal, and probably wasn't made to be illegal so as to
// preserve older behavior. TODO: remove this and make local fabric server name a required config
defaultSensor = new WritePathLatencySensor(localRepository, metricConfig, "default-");
}

public void recordLeaderLag(String region, long startTime) {
public void recordReadyToServeLeaderLag(String region, long startTime) {
long endTime = System.currentTimeMillis();
leaderSensors.computeIfAbsent(region, k -> defaultSensor).record(endTime - startTime, endTime);
readyToServeLeaderSensors.computeIfAbsent(region, k -> defaultSensor).record(endTime - startTime, endTime);
}

public void recordFollowerLag(String region, long startTime) {
public void recordReadyToServeFollowerLag(String region, long startTime) {
long endTime = System.currentTimeMillis();
followerSensors.computeIfAbsent(region, k -> defaultSensor).record(endTime - startTime, endTime);
readyToServeFollowerSensors.computeIfAbsent(region, k -> defaultSensor).record(endTime - startTime, endTime);
}

public WritePathLatencySensor getLeaderLag(String region) {
return leaderSensors.computeIfAbsent(region, k -> defaultSensor);
public void recordCatchingUpFollowerLag(String region, long startTime) {
long endTime = System.currentTimeMillis();
catchingUpFollowerSensors.computeIfAbsent(region, k -> defaultSensor).record(endTime - startTime, endTime);
}

public WritePathLatencySensor getReadyToServeLeaderLag(String region) {
return readyToServeLeaderSensors.computeIfAbsent(region, k -> defaultSensor);
}

public WritePathLatencySensor getReadyToServeFollowerLag(String region) {
return readyToServeFollowerSensors.computeIfAbsent(region, k -> defaultSensor);
}

public WritePathLatencySensor getFollowerLag(String region) {
return followerSensors.computeIfAbsent(region, k -> defaultSensor);
public WritePathLatencySensor getCatchingUpFollowerLag(String region) {
return catchingUpFollowerSensors.computeIfAbsent(region, k -> defaultSensor);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
public class HeartbeatStatReporter extends AbstractVeniceStatsReporter<HeartbeatStat> {
private static final String LEADER_METRIC_PREFIX = "heartbeat_delay_ms_leader-";
private static final String FOLLOWER_METRIC_PREFIX = "heartbeat_delay_ms_follower-";
private static final String CATCHUP_UP_FOLLOWER_METRIC_PREFIX = "catching_up_heartbeat_delay_ms_follower-";
private static final String MAX = "-Max";
private static final String AVG = "-Avg";

Expand All @@ -21,32 +22,48 @@ public HeartbeatStatReporter(MetricsRepository metricsRepository, String storeNa
if (getStats() == null) {
return NULL_INGESTION_STATS.code;
}
return getStats().getLeaderLag(region).getMax();
return getStats().getReadyToServeLeaderLag(region).getMax();
}, LEADER_METRIC_PREFIX + region + MAX));

registerSensor(new AsyncGauge((ignored, ignored2) -> {
if (getStats() == null) {
return NULL_INGESTION_STATS.code;
}

return getStats().getFollowerLag(region).getMax();
return getStats().getReadyToServeFollowerLag(region).getMax();
}, FOLLOWER_METRIC_PREFIX + region + MAX));

registerSensor(new AsyncGauge((ignored, ignored2) -> {
if (getStats() == null) {
return NULL_INGESTION_STATS.code;
}

return getStats().getLeaderLag(region).getAvg();
return getStats().getReadyToServeLeaderLag(region).getAvg();
}, LEADER_METRIC_PREFIX + region + AVG));

registerSensor(new AsyncGauge((ignored, ignored2) -> {
if (getStats() == null) {
return NULL_INGESTION_STATS.code;
}

return getStats().getFollowerLag(region).getAvg();
return getStats().getReadyToServeFollowerLag(region).getAvg();
}, FOLLOWER_METRIC_PREFIX + region + AVG));

registerSensor(new AsyncGauge((ignored, ignored2) -> {
if (getStats() == null) {
return NULL_INGESTION_STATS.code;
}

return getStats().getCatchingUpFollowerLag(region).getMax();
}, CATCHUP_UP_FOLLOWER_METRIC_PREFIX + region + MAX));

registerSensor(new AsyncGauge((ignored, ignored2) -> {
if (getStats() == null) {
return NULL_INGESTION_STATS.code;
}

return getStats().getCatchingUpFollowerLag(region).getAvg();
}, CATCHUP_UP_FOLLOWER_METRIC_PREFIX + region + AVG));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,30 +7,46 @@
import io.tehuti.metrics.MetricsRepository;
import java.util.Map;
import java.util.function.Supplier;
import org.apache.commons.lang3.tuple.Pair;


public class HeartbeatVersionedStats extends AbstractVeniceAggVersionedStats<HeartbeatStat, HeartbeatStatReporter> {
private final Map<String, Map<Integer, Map<Integer, Map<String, Long>>>> leaderMonitors;
private final Map<String, Map<Integer, Map<Integer, Map<String, Long>>>> followerMonitors;
private final Map<String, Map<Integer, Map<Integer, Map<String, Pair<Long, Boolean>>>>> leaderMonitors;
private final Map<String, Map<Integer, Map<Integer, Map<String, Pair<Long, Boolean>>>>> followerMonitors;

public HeartbeatVersionedStats(
MetricsRepository metricsRepository,
ReadOnlyStoreRepository metadataRepository,
Supplier<HeartbeatStat> statsInitiator,
StatsSupplier<HeartbeatStatReporter> reporterSupplier,
Map<String, Map<Integer, Map<Integer, Map<String, Long>>>> leaderMonitors,
Map<String, Map<Integer, Map<Integer, Map<String, Long>>>> followerMonitors) {
Map<String, Map<Integer, Map<Integer, Map<String, Pair<Long, Boolean>>>>> leaderMonitors,
Map<String, Map<Integer, Map<Integer, Map<String, Pair<Long, Boolean>>>>> followerMonitors) {
super(metricsRepository, metadataRepository, statsInitiator, reporterSupplier, true);
this.leaderMonitors = leaderMonitors;
this.followerMonitors = followerMonitors;
}

public void recordLeaderLag(String storeName, int version, String region, long heartbeatTs) {
getStats(storeName, version).recordLeaderLag(region, heartbeatTs);
getStats(storeName, version).recordReadyToServeLeaderLag(region, heartbeatTs);
}

public void recordFollowerLag(String storeName, int version, String region, long heartbeatTs) {
getStats(storeName, version).recordFollowerLag(region, heartbeatTs);
public void recordFollowerLag(
String storeName,
int version,
String region,
long heartbeatTs,
boolean isReadyToServe) {
// If the partition is ready to serve, report it's lage to the main lag metric. Otherwise, report it
// to the catch up metric.
// The metric which isn't updated is squelched by reporting the currentTime (so as to appear caught up and mute
// alerts)
if (isReadyToServe) {
getStats(storeName, version).recordReadyToServeFollowerLag(region, heartbeatTs);
getStats(storeName, version).recordCatchingUpFollowerLag(region, System.currentTimeMillis());
} else {
getStats(storeName, version).recordReadyToServeFollowerLag(region, System.currentTimeMillis());
getStats(storeName, version).recordCatchingUpFollowerLag(region, heartbeatTs);
}
}

@Override
Expand Down
Loading

0 comments on commit 4fdaac5

Please sign in to comment.