Skip to content

Commit

Permalink
[controller] Add support for controller instance tagging (#1190)
Browse files Browse the repository at this point in the history
* Add ability to set tag on controller instances

* Fix tagging and test

* Fix imports

* Make sure to revert new helix admin

* Don't close twice
  • Loading branch information
kvargha authored Sep 20, 2024
1 parent 262101c commit 5619993
Show file tree
Hide file tree
Showing 8 changed files with 48 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,8 @@ private ConfigKeys() {
public static final String CONTROLLER_CLUSTER_ZK_ADDRESSS = "controller.cluster.zk.address";
// Name of the Helix cluster for controllers
public static final String CONTROLLER_CLUSTER = "controller.cluster.name";
// What tags to assign to a controller instance
public static final String CONTROLLER_INSTANCE_TAG_LIST = "controller.instance.tag.list";

/** List of forbidden admin paths */
public static final String CONTROLLER_DISABLED_ROUTES = "controller.cluster.disabled.routes";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import static com.linkedin.venice.ConfigKeys.CLUSTER_TO_D2;
import static com.linkedin.venice.ConfigKeys.CLUSTER_TO_SERVER_D2;
import static com.linkedin.venice.ConfigKeys.CONTROLLER_ADD_VERSION_VIA_ADMIN_PROTOCOL;
import static com.linkedin.venice.ConfigKeys.CONTROLLER_INSTANCE_TAG_LIST;
import static com.linkedin.venice.ConfigKeys.CONTROLLER_SSL_ENABLED;
import static com.linkedin.venice.ConfigKeys.CONTROLLER_SYSTEM_SCHEMA_CLUSTER_NAME;
import static com.linkedin.venice.ConfigKeys.DEFAULT_MAX_NUMBER_OF_PARTITIONS;
Expand Down Expand Up @@ -92,6 +93,7 @@ public void setupCluster(boolean createParticipantStore, MetricsRepository metri
properties.put(ADMIN_HELIX_MESSAGING_CHANNEL_ENABLED, true);
}
properties.put(UNREGISTER_METRIC_FOR_DELETED_STORE_ENABLED, true);
properties.put(CONTROLLER_INSTANCE_TAG_LIST, "GENERAL,TEST");
controllerProps = new VeniceProperties(properties);
helixMessageChannelStats = new HelixMessageChannelStats(new MetricsRepository(), clusterName);
controllerConfig = new VeniceControllerClusterConfig(controllerProps);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2054,4 +2054,16 @@ public void testRaceConditionFixForKillOfflinePushAndVersionSwap(boolean isKillO
stopParticipant(newNodeId);
}

@Test
public void testInstanceTagging() {
List<String> instanceTagList = Arrays.asList("GENERAL", "TEST");
String controllerClusterName = "venice-controllers";

for (String instanceTag: instanceTagList) {
List<String> instances =
veniceAdmin.getHelixAdmin().getInstancesInClusterWithTag(controllerClusterName, instanceTag);
Assert.assertEquals(instances.size(), 1);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -128,4 +128,9 @@ void createVeniceStorageClusterResources(
* Release resources.
*/
void close();

/**
* Adds a tag to an instance
*/
void addInstanceTag(String clusterName, String instanceName, String tag);
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import static com.linkedin.venice.ConfigKeys.CONTROLLER_ENABLE_DISABLED_REPLICA_ENABLED;
import static com.linkedin.venice.ConfigKeys.CONTROLLER_ENFORCE_SSL;
import static com.linkedin.venice.ConfigKeys.CONTROLLER_HAAS_SUPER_CLUSTER_NAME;
import static com.linkedin.venice.ConfigKeys.CONTROLLER_INSTANCE_TAG_LIST;
import static com.linkedin.venice.ConfigKeys.CONTROLLER_IN_AZURE_FABRIC;
import static com.linkedin.venice.ConfigKeys.CONTROLLER_JETTY_CONFIG_OVERRIDE_PREFIX;
import static com.linkedin.venice.ConfigKeys.CONTROLLER_MIN_SCHEMA_COUNT_TO_KEEP;
Expand Down Expand Up @@ -223,6 +224,7 @@ public class VeniceControllerClusterConfig {
// Name of the Helix cluster for controllers
private final String controllerClusterName;
private final String controllerClusterZkAddress;
private final List<String> controllerInstanceTagList;
private final boolean multiRegion;
private final boolean parent;
private final ParentControllerRegionState parentControllerRegionState;
Expand Down Expand Up @@ -635,6 +637,7 @@ public VeniceControllerClusterConfig(VeniceProperties props) {
*/
this.adminCheckReadMethodForKafka = props.getBoolean(ADMIN_CHECK_READ_METHOD_FOR_KAFKA, true);
this.controllerClusterName = props.getString(CONTROLLER_CLUSTER, "venice-controllers");
this.controllerInstanceTagList = props.getList(CONTROLLER_INSTANCE_TAG_LIST, Collections.emptyList());
this.controllerClusterReplica = props.getInt(CONTROLLER_CLUSTER_REPLICA, 3);
this.controllerClusterZkAddress = props.getString(CONTROLLER_CLUSTER_ZK_ADDRESSS, getZkAddress());
this.parent = props.getBoolean(CONTROLLER_PARENT_MODE, false);
Expand Down Expand Up @@ -1161,6 +1164,10 @@ public String getControllerClusterName() {
return controllerClusterName;
}

public List<String> getControllerInstanceTagList() {
return controllerInstanceTagList;
}

public String getControllerClusterZkAddress() {
return controllerClusterZkAddress;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -282,4 +282,8 @@ public int getDefaultMaxRecordSizeBytes() {
public long getServiceDiscoveryRegistrationRetryMS() {
return getCommonConfig().getServiceDiscoveryRegistrationRetryMS();
}

public List<String> getControllerInstanceTagList() {
return getCommonConfig().getControllerInstanceTagList();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -671,6 +671,7 @@ public VeniceHelixAdmin(
if (!multiClusterConfigs.getControllerConfig(clusterName).isErrorLeaderReplicaFailOverEnabled()) {
continue;
}

HelixLiveInstanceMonitor liveInstanceMonitor = new HelixLiveInstanceMonitor(this.zkClient, clusterName);
DisabledPartitionStats disabledPartitionStats = new DisabledPartitionStats(metricsRepository, clusterName);
disabledPartitionStatMap.put(clusterName, disabledPartitionStats);
Expand Down Expand Up @@ -793,6 +794,13 @@ private synchronized void connectToControllerCluster() {
}
controllerClusterKeyBuilder = new PropertyKey.Builder(tempManager.getClusterName());
helixManager = tempManager;

List<String> instanceTagList = multiClusterConfigs.getControllerInstanceTagList();
for (String instanceTag: instanceTagList) {
helixAdminClient.addInstanceTag(controllerClusterName, helixManager.getInstanceName(), instanceTag);
}
LOGGER.info("Connected to controller cluster {} with controller {}", controllerClusterName, controllerName);

}

public ZkClient getZkClient() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -314,4 +314,12 @@ public void resetPartition(
public void close() {
helixAdmin.close();
}

/**
* @see HelixAdminClient#addInstanceTag(String, String, String)()
*/
@Override
public void addInstanceTag(String clusterName, String instanceName, String tag) {
helixAdmin.addInstanceTag(clusterName, instanceName, tag);
}
}

0 comments on commit 5619993

Please sign in to comment.