Skip to content

Commit

Permalink
[server] Fix NPE for ServerReadQuotaUsageStats when store has no vers…
Browse files Browse the repository at this point in the history
…ion or traffic (#1291)

* [server] Fix NPE for ServerReadQuotaUsageStats when store has no version or traffic

1. Prevent NPE in ServerReadQuotaUsageStats's async gauges when a store has no version or traffic.
2. Set backup version using version list in handle store change since the old behavior will not set the correct backup version after server restart.
  • Loading branch information
xunyin8 authored Nov 8, 2024
1 parent 0ea8dce commit 90dd662
Show file tree
Hide file tree
Showing 6 changed files with 65 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -413,9 +413,12 @@ public void handleStoreChanged(Store store) {

List<String> topics =
store.getVersions().stream().map((version) -> version.kafkaTopicName()).collect(Collectors.toList());
int currentVersion = store.getCurrentVersion();
int backupVersion = 0;
for (String topic: topics) {
toBeRemovedTopics.remove(topic);
customizedViewRepository.subscribeRoutingDataChange(topic, this);
int versionNumber = Version.parseVersionFromKafkaTopicName(topic);
try {
/**
* make sure we're up-to-date after registering as a listener
Expand All @@ -427,8 +430,12 @@ public void handleStoreChanged(Store store) {
*
*/
this.onCustomizedViewChange(customizedViewRepository.getPartitionAssignments(topic));
if (versionNumber != currentVersion && versionNumber > backupVersion
&& VersionStatus.isBootstrapCompleted(store.getVersionStatus(versionNumber))) {
backupVersion = versionNumber;
}
} catch (VeniceNoHelixResourceException e) {
Version version = store.getVersion(Version.parseVersionFromKafkaTopicName(topic));
Version version = store.getVersion(versionNumber);
if (version != null && version.getStatus().equals(VersionStatus.ONLINE)) {
/**
* The store metadata believes this version is online, but the partition assignment is not in the
Expand Down Expand Up @@ -462,7 +469,12 @@ public void handleStoreChanged(Store store) {
}
}
removeTopics(toBeRemovedTopics);
stats.setCurrentVersion(store.getName(), store.getCurrentVersion());
if (currentVersion > 0) {
stats.setCurrentVersion(store.getName(), currentVersion);
}
if (backupVersion > 0) {
stats.setBackupVersion(store.getName(), backupVersion);
}
}

private Set<String> getStoreTopics(String storeName) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,4 +37,8 @@ public void setNodeQuotaResponsibility(String storeName, int version, long nodeK
public void setCurrentVersion(String storeName, int version) {
getStoreStats(storeName).setCurrentVersion(version);
}

public void setBackupVersion(String storeName, int version) {
getStoreStats(storeName).setBackupVersion(version);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -66,15 +66,16 @@ public ServerReadQuotaUsageStats(MetricsRepository metricsRepository, String nam

public void setCurrentVersion(int version) {
int oldCurrentVersion = currentVersion.get();
if (version == oldCurrentVersion) {
if (version != oldCurrentVersion) {
// Defensive coding since set current version can be called multiple times with the same current version
return;
currentVersion.compareAndSet(oldCurrentVersion, version);
}
if (currentVersion.compareAndSet(oldCurrentVersion, version)) {
// Old current version becomes the backup. This should work even if:
// a) we rolled back current version
// b) current version used to be 0
backupVersion.set(oldCurrentVersion);
}

public void setBackupVersion(int version) {
int oldBackupVersion = backupVersion.get();
if (version != oldBackupVersion) {
backupVersion.compareAndSet(oldBackupVersion, version);
}
}

Expand Down Expand Up @@ -114,24 +115,26 @@ private ServerReadQuotaVersionedStats getVersionedStats(int version) {
return versionedStats.computeIfAbsent(version, (ignored) -> new ServerReadQuotaVersionedStats(time, metricConfig));
}

private Double getVersionedRequestedQPS(int version) {
// Package private for testing purpose
final Double getVersionedRequestedQPS(int version) {
if (version < 1) {
return Double.NaN;
}
return versionedStats.get(version).getRequestedQPS();
return getVersionedStats(version).getRequestedQPS();
}

private Double getVersionedRequestedKPS(int version) {
// Package private for testing purpose
final Double getVersionedRequestedKPS(int version) {
if (version < 1) {
return Double.NaN;
}
return versionedStats.get(version).getRequestedKPS();
return getVersionedStats(version).getRequestedKPS();
}

/**
* @return the ratio of the read quota usage to the node's quota responsibility
*/
private Double getReadQuotaUsageRatio() {
final Double getReadQuotaUsageRatio() {
int version = currentVersion.get();
if (version < 1 || !versionedStats.containsKey(version)) {
return Double.NaN;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import com.linkedin.venice.meta.ReadOnlyStoreRepository;
import com.linkedin.venice.meta.Store;
import com.linkedin.venice.meta.Version;
import com.linkedin.venice.meta.VersionStatus;
import com.linkedin.venice.protocols.VeniceServerResponse;
import com.linkedin.venice.read.RequestType;
import com.linkedin.venice.response.VeniceReadResponseStatus;
Expand Down Expand Up @@ -831,6 +832,7 @@ private Store setUpStoreMock(
doReturn(versionList).when(store).getVersions();
doReturn(readQuota).when(store).getReadQuotaInCU();
doReturn(readQuotaEnabled).when(store).isStorageNodeReadQuotaEnabled();
doReturn(VersionStatus.ONLINE).when(store).getVersionStatus(anyInt());
return store;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ public void testAggServerQuotaUsageStats() {
String totalReadQuotaRequestedKPSString = ".total--current_quota_request_key_count.Gauge";
long batchSize = 100;
long batchSize2 = 200;
aggServerQuotaUsageStats.setCurrentVersion(storeName, 1);
aggServerQuotaUsageStats.setBackupVersion(storeName, 1);
aggServerQuotaUsageStats.setCurrentVersion(storeName, 2);
aggServerQuotaUsageStats.setCurrentVersion(storeName2, 1);
aggServerQuotaUsageStats.recordAllowed(storeName, 1, batchSize);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,4 +21,33 @@ public void testGetReadQuotaUsageRatio() {
Assert.assertTrue(
metricsRepository.getMetric(".test-store--quota_requested_usage_ratio.Gauge").value() <= usageRatio);
}

@Test
public void testGetReadQuotaMetricsWithNoVersionOrRecordings() {
MetricsRepository metricsRepository = new MetricsRepository();
String storeName = "test-store";
int currentVersion = 3;
int backupVersion = 2;
ServerReadQuotaUsageStats stats = new ServerReadQuotaUsageStats(metricsRepository, storeName);
// Stats shouldn't fail if the store don't have any versions yet
Assert.assertEquals(stats.getVersionedRequestedQPS(backupVersion), 0d);
Assert.assertEquals(stats.getVersionedRequestedQPS(currentVersion), 0d);
Assert.assertEquals(stats.getVersionedRequestedKPS(backupVersion), 0d);
Assert.assertEquals(stats.getVersionedRequestedKPS(currentVersion), 0d);
Assert.assertEquals(stats.getReadQuotaUsageRatio(), Double.NaN);
// Stats shouldn't fail if there are no recordings yet
stats.setCurrentVersion(currentVersion);
stats.setBackupVersion(backupVersion);
Assert.assertEquals(stats.getVersionedRequestedQPS(backupVersion), 0d);
Assert.assertEquals(stats.getVersionedRequestedQPS(currentVersion), 0d);
Assert.assertEquals(stats.getVersionedRequestedKPS(backupVersion), 0d);
Assert.assertEquals(stats.getVersionedRequestedKPS(currentVersion), 0d);
Assert.assertEquals(stats.getReadQuotaUsageRatio(), Double.NaN);
// The replica receives some assignment and traffic for current version
stats.setNodeQuotaResponsibility(currentVersion, 1000);
stats.recordAllowed(currentVersion, 100);
Assert.assertTrue(stats.getReadQuotaUsageRatio() > 0);
Assert.assertTrue(stats.getVersionedRequestedQPS(currentVersion) > 0);
Assert.assertTrue(stats.getVersionedRequestedKPS(currentVersion) > 0);
}
}

0 comments on commit 90dd662

Please sign in to comment.