Skip to content

Commit

Permalink
[controller] Enable configuring a customized health check URL for sto…
Browse files Browse the repository at this point in the history
…rage clusters (linkedin#1273)

Helix has added a new functionality that allows configuring a customized health check endpoint that Helix will invoke to decide on the health of the cluster. This URL can be configured by setting the controller config `controller.helix.rest.customized.health.url`
  • Loading branch information
nisargthakkar authored Nov 7, 2024
1 parent ddaa0bb commit 4cf1d0a
Show file tree
Hide file tree
Showing 9 changed files with 119 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -350,6 +350,12 @@ private ConfigKeys() {
*/
public static final String CONTROLLER_HELIX_CLOUD_INFO_PROCESSOR_NAME = "controller.helix.cloud.info.processor.name";

/**
* Base URL for customized health checks triggered by Helix. Default is empty string.
*/
public static final String CONTROLLER_HELIX_REST_CUSTOMIZED_HEALTH_URL =
"controller.helix.rest.customized.health.url";

/**
* Whether to enable graveyard cleanup for batch-only store at cluster level. Default is false.
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
package com.linkedin.venice.controllerapi;

import static com.linkedin.venice.controllerapi.ControllerApiConstants.CLUSTER_ID;
import static com.linkedin.venice.controllerapi.ControllerApiConstants.INSTANCES;
import static com.linkedin.venice.controllerapi.ControllerApiConstants.TO_BE_STOPPED_INSTANCES;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import java.util.Collections;
Expand All @@ -13,16 +17,16 @@ public class AggregatedHealthStatusRequest {

@JsonCreator
public AggregatedHealthStatusRequest(
@JsonProperty("cluster_id") String cluster_id,
@JsonProperty("instances") List<String> instances,
@JsonProperty("to_be_stopped_instances") List<String> to_be_stopped_instances) {
@JsonProperty(CLUSTER_ID) String cluster_id,
@JsonProperty(INSTANCES) List<String> instances,
@JsonProperty(TO_BE_STOPPED_INSTANCES) List<String> to_be_stopped_instances) {
if (cluster_id == null) {
throw new IllegalArgumentException("'cluster_id' is required");
throw new IllegalArgumentException("'" + CLUSTER_ID + "' is required");
}
this.cluster_id = cluster_id;

if (instances == null) {
throw new IllegalArgumentException("'instances' is required");
throw new IllegalArgumentException("'" + INSTANCES + "' is required");
}
this.instances = instances;

Expand All @@ -33,17 +37,17 @@ public AggregatedHealthStatusRequest(
}
}

@JsonProperty("cluster_id")
@JsonProperty(CLUSTER_ID)
public String getClusterId() {
return cluster_id;
}

@JsonProperty("instances")
@JsonProperty(INSTANCES)
public List<String> getInstances() {
return instances;
}

@JsonProperty("to_be_stopped_instances")
@JsonProperty(TO_BE_STOPPED_INSTANCES)
public List<String> getToBeStoppedInstances() {
return to_be_stopped_instances;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,7 @@ public Void call() {
client.addClusterToGrandCluster("venice-controllers");
for (int i = 0; i < 10; i++) {
String clusterName = "cluster-" + i;
client.createVeniceStorageCluster(clusterName, new ClusterConfig(clusterName));
client.createVeniceStorageCluster(clusterName, new ClusterConfig(clusterName), null);
client.addClusterToGrandCluster(clusterName);
client.addVeniceStorageClusterToControllerCluster(clusterName);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import java.util.List;
import java.util.Map;
import org.apache.helix.model.ClusterConfig;
import org.apache.helix.model.RESTConfig;


/**
Expand Down Expand Up @@ -31,8 +32,9 @@ public interface HelixAdminClient {
* Create and configure the Venice storage cluster.
* @param clusterName of the Venice storage cluster.
* @param clusterConfig {@link ClusterConfig} for the new cluster.
* @param restConfig {@link RESTConfig} for the new cluster.
*/
void createVeniceStorageCluster(String clusterName, ClusterConfig clusterConfig);
void createVeniceStorageCluster(String clusterName, ClusterConfig clusterConfig, RESTConfig restConfig);

/**
* Check if the given Venice storage cluster's cluster resource is in the Venice controller cluster.
Expand Down Expand Up @@ -67,6 +69,13 @@ public interface HelixAdminClient {
*/
void updateClusterConfigs(String clusterName, ClusterConfig clusterConfig);

/**
* Update some Helix cluster properties for the given cluster.
* @param clusterName of the cluster to be updated.
* @param restConfig {@link RESTConfig} for the new cluster.
*/
void updateRESTConfigs(String clusterName, RESTConfig restConfig);

/**
* Disable or enable a list of partitions on an instance.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import static com.linkedin.venice.ConfigKeys.CONTROLLER_HELIX_CLOUD_INFO_PROCESSOR_NAME;
import static com.linkedin.venice.ConfigKeys.CONTROLLER_HELIX_CLOUD_INFO_SOURCES;
import static com.linkedin.venice.ConfigKeys.CONTROLLER_HELIX_CLOUD_PROVIDER;
import static com.linkedin.venice.ConfigKeys.CONTROLLER_HELIX_REST_CUSTOMIZED_HEALTH_URL;
import static com.linkedin.venice.ConfigKeys.CONTROLLER_INSTANCE_TAG_LIST;
import static com.linkedin.venice.ConfigKeys.CONTROLLER_JETTY_CONFIG_OVERRIDE_PREFIX;
import static com.linkedin.venice.ConfigKeys.CONTROLLER_MIN_SCHEMA_COUNT_TO_KEEP;
Expand Down Expand Up @@ -368,6 +369,8 @@ public class VeniceControllerClusterConfig {
private final boolean storageClusterHelixCloudEnabled;
private final CloudConfig helixCloudConfig;

private final String helixRestCustomizedHealthUrl;

private final boolean usePushStatusStoreForIncrementalPushStatusReads;

private final long metaStoreWriterCloseTimeoutInMS;
Expand Down Expand Up @@ -930,6 +933,8 @@ public VeniceControllerClusterConfig(VeniceProperties props) {
helixCloudConfig = null;
}

this.helixRestCustomizedHealthUrl = props.getString(CONTROLLER_HELIX_REST_CUSTOMIZED_HEALTH_URL, "");

this.unregisterMetricForDeletedStoreEnabled = props.getBoolean(UNREGISTER_METRIC_FOR_DELETED_STORE_ENABLED, false);
this.identityParserClassName = props.getString(IDENTITY_PARSER_CLASS, DefaultIdentityParser.class.getName());
this.storeGraveyardCleanupEnabled = props.getBoolean(CONTROLLER_STORE_GRAVEYARD_CLEANUP_ENABLED, false);
Expand Down Expand Up @@ -1580,6 +1585,10 @@ public CloudConfig getHelixCloudConfig() {
return helixCloudConfig;
}

public String getHelixRestCustomizedHealthUrl() {
return helixRestCustomizedHealthUrl;
}

public boolean usePushStatusStoreForIncrementalPush() {
return usePushStatusStoreForIncrementalPushStatusReads;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,7 @@
import org.apache.helix.model.LeaderStandbySMD;
import org.apache.helix.model.LiveInstance;
import org.apache.helix.model.MaintenanceSignal;
import org.apache.helix.model.RESTConfig;
import org.apache.helix.model.builder.HelixConfigScopeBuilder;
import org.apache.helix.participant.StateMachineEngine;
import org.apache.helix.zookeeper.datamodel.ZNRecord;
Expand Down Expand Up @@ -6298,7 +6299,13 @@ private void setupStorageClusterAsNeeded(String clusterName) {
helixClusterConfig.setTopology("/" + HelixUtils.TOPOLOGY_CONSTRAINT);
helixClusterConfig.setFaultZoneType(HelixUtils.TOPOLOGY_CONSTRAINT);

helixAdminClient.createVeniceStorageCluster(clusterName, helixClusterConfig);
RESTConfig restConfig = null;
if (!StringUtils.isEmpty(clusterConfigs.getHelixRestCustomizedHealthUrl())) {
restConfig = new RESTConfig(clusterName);
restConfig.set(RESTConfig.SimpleFields.CUSTOMIZED_HEALTH_URL, clusterConfigs.getHelixRestCustomizedHealthUrl());
}

helixAdminClient.createVeniceStorageCluster(clusterName, helixClusterConfig, restConfig);
}
if (!helixAdminClient.isClusterInGrandCluster(clusterName)) {
helixAdminClient.addClusterToGrandCluster(clusterName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.helix.model.IdealState;
import org.apache.helix.model.InstanceConfig;
import org.apache.helix.model.LeaderStandbySMD;
import org.apache.helix.model.RESTConfig;
import org.apache.helix.model.builder.HelixConfigScopeBuilder;
import org.apache.helix.zookeeper.impl.client.ZkClient;
import org.apache.logging.log4j.LogManager;
Expand Down Expand Up @@ -114,10 +115,10 @@ public void createVeniceControllerCluster() {
}

/**
* @see HelixAdminClient#createVeniceStorageCluster(String, ClusterConfig)
* @see HelixAdminClient#createVeniceStorageCluster(String, ClusterConfig, RESTConfig)
*/
@Override
public void createVeniceStorageCluster(String clusterName, ClusterConfig helixClusterConfig) {
public void createVeniceStorageCluster(String clusterName, ClusterConfig helixClusterConfig, RESTConfig restConfig) {
boolean success = RetryUtils.executeWithMaxAttempt(() -> {
if (!isVeniceStorageClusterCreated(clusterName)) {
if (!helixAdmin.addCluster(clusterName, false)) {
Expand All @@ -130,6 +131,10 @@ public void createVeniceStorageCluster(String clusterName, ClusterConfig helixCl
if (clusterConfig.isStorageClusterHelixCloudEnabled()) {
helixAdmin.addCloudConfig(clusterName, clusterConfig.getHelixCloudConfig());
}

if (restConfig != null) {
updateRESTConfigs(clusterName, restConfig);
}
}
return true;
}, 3, Duration.ofSeconds(5), Collections.singletonList(Exception.class));
Expand Down Expand Up @@ -215,6 +220,17 @@ public void updateClusterConfigs(String clusterName, ClusterConfig clusterConfig
helixAdmin.setConfig(configScope, helixClusterProperties);
}

/**
* @see HelixAdminClient#updateRESTConfigs(String, RESTConfig)
*/
@Override
public void updateRESTConfigs(String clusterName, RESTConfig restConfig) {
HelixConfigScope configScope =
new HelixConfigScopeBuilder(HelixConfigScope.ConfigScopeProperty.REST).forCluster(clusterName).build();
Map<String, String> helixRestProperties = new HashMap<>(restConfig.getRecord().getSimpleFields());
helixAdmin.setConfig(configScope, helixRestProperties);
}

/**
* @see HelixAdminClient#enablePartition(boolean, String, String, String, List)
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import static com.linkedin.venice.ConfigKeys.CONTROLLER_HELIX_CLOUD_INFO_PROCESSOR_NAME;
import static com.linkedin.venice.ConfigKeys.CONTROLLER_HELIX_CLOUD_INFO_SOURCES;
import static com.linkedin.venice.ConfigKeys.CONTROLLER_HELIX_CLOUD_PROVIDER;
import static com.linkedin.venice.ConfigKeys.CONTROLLER_HELIX_REST_CUSTOMIZED_HEALTH_URL;
import static com.linkedin.venice.ConfigKeys.CONTROLLER_PARENT_MODE;
import static com.linkedin.venice.ConfigKeys.CONTROLLER_SSL_ENABLED;
import static com.linkedin.venice.ConfigKeys.CONTROLLER_STORAGE_CLUSTER_HELIX_CLOUD_ENABLED;
Expand Down Expand Up @@ -283,4 +284,15 @@ private void validateCloudConfig(
assertEquals(cloudConfig.getCloudInfoProcessorName(), processorName);
assertEquals(cloudConfig.getCloudInfoSources(), cloudInfoSources);
}

@Test
public void testHelixRestCustomizedHealthUrl() {
Properties baseProps = getBaseSingleRegionProperties(false);

String healthUrl = "http://localhost:8080/health";
baseProps.setProperty(CONTROLLER_HELIX_REST_CUSTOMIZED_HEALTH_URL, healthUrl);

VeniceControllerClusterConfig clusterConfig = new VeniceControllerClusterConfig(new VeniceProperties(baseProps));
assertEquals(clusterConfig.getHelixRestCustomizedHealthUrl(), healthUrl);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.helix.model.ClusterConfig;
import org.apache.helix.model.HelixConfigScope;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.RESTConfig;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

Expand Down Expand Up @@ -120,25 +121,35 @@ public void testCreateVeniceStorageCluster() {
when(mockMultiClusterConfigs.getControllerConfig(clusterName)).thenReturn(mockClusterConfig);

doReturn(true).when(mockHelixAdmin).addCluster(clusterName, false);
doCallRealMethod().when(zkHelixAdminClient).createVeniceStorageCluster(any(), any());
doCallRealMethod().when(zkHelixAdminClient).createVeniceStorageCluster(any(), any(), any());

// When the cluster is not Helix cloud enabled
ClusterConfig helixClusterConfig = mock(ClusterConfig.class);
zkHelixAdminClient.createVeniceStorageCluster(clusterName, helixClusterConfig);
zkHelixAdminClient.createVeniceStorageCluster(clusterName, helixClusterConfig, null);

verify(zkHelixAdminClient).updateClusterConfigs(clusterName, helixClusterConfig);
verify(mockHelixAdmin, never()).addCloudConfig(any(), any());
verify(zkHelixAdminClient, never()).updateRESTConfigs(any(), any());

clearInvocations(zkHelixAdminClient);

// When the cluster is Helix cloud enabled
doReturn(true).when(mockClusterConfig).isStorageClusterHelixCloudEnabled();
CloudConfig cloudConfig = mock(CloudConfig.class);
doReturn(cloudConfig).when(mockClusterConfig).getHelixCloudConfig();
zkHelixAdminClient.createVeniceStorageCluster(clusterName, helixClusterConfig);
zkHelixAdminClient.createVeniceStorageCluster(clusterName, helixClusterConfig, null);

verify(zkHelixAdminClient).updateClusterConfigs(clusterName, helixClusterConfig);
verify(mockHelixAdmin).addCloudConfig(clusterName, cloudConfig);
verify(zkHelixAdminClient, never()).updateRESTConfigs(any(), any());

clearInvocations(zkHelixAdminClient, mockHelixAdmin);
doReturn(false).when(mockClusterConfig).isStorageClusterHelixCloudEnabled();

RESTConfig restConfig = mock(RESTConfig.class);
zkHelixAdminClient.createVeniceStorageCluster(clusterName, helixClusterConfig, restConfig);

verify(zkHelixAdminClient).updateClusterConfigs(clusterName, helixClusterConfig);
verify(mockHelixAdmin, never()).addCloudConfig(any(), any());
verify(zkHelixAdminClient).updateRESTConfigs(clusterName, restConfig);
}

@Test
Expand Down Expand Up @@ -183,4 +194,31 @@ public void testUpdateClusterConfigs() {

zkHelixAdminClient.updateClusterConfigs(clusterName, clusterConfig);
}

@Test
public void testUpdateRESTConfigs() {
doCallRealMethod().when(zkHelixAdminClient).updateRESTConfigs(anyString(), any());

String clusterName = "testCluster";
String restUrl = "http://localhost:8080";
RESTConfig restConfig = new RESTConfig(clusterName);

restConfig.set(RESTConfig.SimpleFields.CUSTOMIZED_HEALTH_URL, restUrl);
restConfig.getRecord().setSimpleField("FIELD1", "VALUE1");

doAnswer(invocation -> {
HelixConfigScope scope = invocation.getArgument(0);
Map<String, String> restProps = invocation.getArgument(1);

assertEquals(scope.getType(), HelixConfigScope.ConfigScopeProperty.REST);
assertEquals(scope.getClusterName(), clusterName);
assertEquals(restProps.size(), 2);
assertEquals(restProps.get(RESTConfig.SimpleFields.CUSTOMIZED_HEALTH_URL.name()), restUrl);
assertEquals(restProps.get("FIELD1"), "VALUE1");

return null;
}).when(mockHelixAdmin).setConfig(any(), any());

zkHelixAdminClient.updateRESTConfigs(clusterName, restConfig);
}
}

0 comments on commit 4cf1d0a

Please sign in to comment.