Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add default replica capacity in resource config #2531

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ public class ClusterMapUtils {
static final String REPLICAS_STR_SEPARATOR = ":";
static final String REPLICAS_CAPACITY_STR = "replicaCapacityInBytes";
static final String REPLICA_TYPE_STR = "replicaType";
static final String DEFAULT_REPLICA_CAPACITY_STR = "defaultReplicaCapacityInBytes";
public static final String SSL_PORT_STR = "sslPort";
public static final String HTTP2_PORT_STR = "http2Port";
static final String RACKID_STR = "rackId";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1173,7 +1173,10 @@ private void setResourceConfig(String dcName, String resourceName, ConfigAccesso
clusterConfigFields.partitionDiskWeightInGB + PARTITION_BUFFER_CAPACITY_FOR_INDEX_FILES_IN_GB));

// 3. Update resource configs in helix
final long GB = 1024 * 1024 * 1024;
resourceConfig.setPartitionCapacityMap(partitionCapacityMap);
resourceConfig.putSimpleConfig(DEFAULT_REPLICA_CAPACITY_STR,
String.valueOf(GB * clusterConfigFields.partitionDiskWeightInGB));
configAccessor.setResourceConfig(clusterName, resourceName, resourceConfig);
info("Updated resource config/partition weights for resource {}. Partition weights: {}", resourceName,
resourceConfig.getPartitionCapacityMap());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import java.util.function.Consumer;
import java.util.stream.Collectors;
import org.apache.helix.AccessOption;
import org.apache.helix.ConfigAccessor;
import org.apache.helix.HelixAdmin;
import org.apache.helix.HelixManager;
import org.apache.helix.InstanceType;
Expand All @@ -50,6 +51,7 @@
import org.apache.helix.model.IdealState;
import org.apache.helix.model.InstanceConfig;
import org.apache.helix.model.LiveInstance;
import org.apache.helix.model.ResourceConfig;
import org.apache.helix.spectator.RoutingTableSnapshot;
import org.apache.helix.store.zk.ZkHelixPropertyStore;
import org.apache.helix.zookeeper.datamodel.ZNRecord;
Expand Down Expand Up @@ -114,6 +116,10 @@ public class HelixClusterManager implements ClusterMap {
final HelixClusterManagerMetrics helixClusterManagerMetrics;
private HelixAggregatedViewClusterInfo helixAggregatedViewClusterInfo = null;

// In FULL_AUTO mode, the resource configs. We probably only need to fetch one resource config, which this
// data node belongs to (This data node has to be a server node).
private final ConcurrentHashMap<String, ResourceConfig> resourceConfigs = new ConcurrentHashMap<>();

/**
* Instantiate a HelixClusterManager.
* @param clusterMapConfig the {@link ClusterMapConfig} associated with this manager.
Expand Down Expand Up @@ -785,28 +791,30 @@ private ReplicaId getBootstrapReplicaInSemiAuto(String partitionIdStr, DataNodeI
* @return {@link ReplicaId} if there is a new replica satisfying given partition and data node. {@code null} otherwise.
*/
private ReplicaId getBootstrapReplicaInFullAuto(String partitionIdStr, DataNodeId dataNodeId) {
long replicaCapacity = 0;
AmbryDisk disk = getDiskForBootstrapReplica((AmbryDataNode) dataNodeId);
if (disk == null) {
logger.error("No Disk is available to host bootstrap replica. Cannot create the replica.");
return null;
}
long replicaCapacity = 0;
boolean decreasedDiskSpace = false;
try {
AmbryPartition mappedPartition =
new AmbryPartition(Long.parseLong(partitionIdStr), clusterMapConfig.clusterMapDefaultPartitionClass,
helixClusterManagerQueryHelper);
AmbryPartition currentPartition =
partitionNameToAmbryPartition.putIfAbsent(mappedPartition.toPathString(), mappedPartition);
if (currentPartition == null) {
// If the partition is new, we get the default replica capacity from resource config
logger.info("Partition {} is currently not present in cluster map, a new partition is created", partitionIdStr);
currentPartition = mappedPartition;
replicaCapacity = DEFAULT_REPLICA_CAPACITY_IN_BYTES;
replicaCapacity = getReplicaCapacityFromResourceConfig(partitionIdStr, dataNodeId);
} else {
// For existing partitions, we should already have other replicas in the map
replicaCapacity = currentPartition.getReplicaIds().get(0).getCapacityInBytes();
}
// Update disk usage
// TODO: Add a metric for this
disk.decreaseAvailableSpaceInBytes(replicaCapacity);
decreasedDiskSpace = true;
AmbryServerReplica replica =
new AmbryServerReplica(clusterMapConfig, currentPartition, disk, true, replicaCapacity,
ReplicaSealStatus.NOT_SEALED);
Expand All @@ -817,15 +825,56 @@ private ReplicaId getBootstrapReplicaInFullAuto(String partitionIdStr, DataNodeI
dataNodeId, e);
// We have decreased the available space on the disk since we thought that it will be used to host replica. Since
// bootstrapping replica failed, increase the available disk space back.
disk.increaseAvailableSpaceInBytes(replicaCapacity);
if (decreasedDiskSpace) {
disk.increaseAvailableSpaceInBytes(replicaCapacity);
}
return null;
}
}

/**
* Get default replica capacity from resource config. A data node belongs to only one resource, if we are creating a
* new partition for this data node, we can get the default capace from this resource's config.
* @param partitionIdStr The partition id in string.
* @param dataNodeId The data node to create a new replica in.
* @return The default replica capacity
*/
private long getReplicaCapacityFromResourceConfig(String partitionIdStr, DataNodeId dataNodeId) {
String instanceName = getInstanceName(dataNodeId.getHostname(), dataNodeId.getPort());
InstanceConfig instanceConfig = localHelixAdmin.getInstanceConfig(clusterName, instanceName);
if (instanceConfig == null) {
throw new IllegalArgumentException("Instance config for " + instanceName + " doesn't exist");
}
String dcName = dataNodeId.getDatacenterName();
List<String> tags = instanceConfig.getTags();
// we should only have one tag
String tag = tags.get(0);
String resourceName = dcToTagToResourceProperty.get(dcName).get(tag).name;
if (!resourceConfigs.contains(resourceName)) {
ConfigAccessor configAccessor = null;
if (clusterMapConfig.clusterMapUseAggregatedView) {
configAccessor = helixAggregatedViewClusterInfo.helixManager.getConfigAccessor();
} else {
configAccessor = ((HelixDcInfo) dcToDcInfo.get(dcName)).helixManager.getConfigAccessor();
}
logger.info("Fetching resource config for {} to create bootstrap replica for partition {}", resourceName,
partitionIdStr);
ResourceConfig resourceConfig = configAccessor.getResourceConfig(clusterName, resourceName);
if (resourceConfig != null) {
resourceConfigs.putIfAbsent(resourceName, resourceConfig);
}
}
ResourceConfig resourceConfig = resourceConfigs.get(resourceName);
if (resourceConfig == null || resourceConfig.getSimpleConfig(DEFAULT_REPLICA_CAPACITY_STR) == null) {
throw new IllegalArgumentException("Missing default replica capacity from resource " + resourceName
+ " when creating bootstrap replica for partition " + partitionIdStr);
}
return Long.parseLong(resourceConfig.getSimpleConfig(DEFAULT_REPLICA_CAPACITY_STR));
}

/**
* Get a disk with maximum available space for bootstrapping replica in Full auto mode. This method is synchronized
* since it can be queried concurrently when multiple replicas are bootstrapped.
* TODO Check for disk health as well (Will be added in next PR)
* @param dataNode the {@link DataNodeId} on which disk is needed
* @return {@link AmbryDisk} which has maximum available or free capacity. If none of the disks have free space,
* returns null.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.helix.AccessOption;
import org.apache.helix.ConfigAccessor;
import org.apache.helix.HelixManager;
import org.apache.helix.InstanceType;
import org.apache.helix.model.IdealState;
Expand Down Expand Up @@ -638,16 +639,40 @@ public void getNewReplicaInFullAutoBasicTest() throws Exception {
}

// Set ZNRecord is NULL in Helix PropertyStore
HelixClusterManager helixClusterManager = new HelixClusterManager(clusterMapConfig, selfInstanceName,
new MockHelixManagerFactory(helixCluster, null, null, useAggregatedView), metricRegistry);
MockHelixManagerFactory helixFactory = new MockHelixManagerFactory(helixCluster, null, null, useAggregatedView);
HelixClusterManager helixClusterManager =
new HelixClusterManager(clusterMapConfig, selfInstanceName, helixFactory, metricRegistry);

// Case 1. We are creating a bootstrap replica for an exiting partition
PartitionId partitionOfNewReplica = helixClusterManager.getAllPartitionIds(null).get(0);
long expectedCapacity = partitionOfNewReplica.getReplicaIds().get(0).getCapacityInBytes();
AmbryDataNode ambryDataNode = helixClusterManager.getDataNodeId(currentNode.getHostname(), currentNode.getPort());
// 1. Verify bootstrap replica should be successful
assertNotNull("New replica should be created successfully",
helixClusterManager.getBootstrapReplica(partitionOfNewReplica.toPathString(), ambryDataNode));
ReplicaId bootstrapReplica =
helixClusterManager.getBootstrapReplica(partitionOfNewReplica.toPathString(), ambryDataNode);
assertNotNull("New replica should be created successfully", bootstrapReplica);
assertEquals("Bootstrap replica of existing partition should has peers' capacity", expectedCapacity,
bootstrapReplica.getCapacityInBytes());
assertEquals("There should be exactly one entry in bootstrap replica map", 1,
helixClusterManager.getBootstrapReplicaMap().size());

// Case 2. We are creating a bootstrap replica for a new partition
String newPartitionId = String.valueOf(idealState.getNumPartitions() + 1000);
// Case 2.1 there is no resource config
assertNull("Missing resource config", helixClusterManager.getBootstrapReplica(newPartitionId, ambryDataNode));
// Case 2.1 resource config created
ResourceConfig resourceConfig = new ResourceConfig.Builder(resourceName).build();
resourceConfig.putSimpleConfig(DEFAULT_REPLICA_CAPACITY_STR, String.valueOf(3 * DEFAULT_REPLICA_CAPACITY_IN_BYTES));
ConfigAccessor configAccessor =
helixFactory.getZKHelixManager(clusterMapConfig.clusterMapClusterName, selfInstanceName, InstanceType.SPECTATOR,
parseDcJsonAndPopulateDcInfo(clusterMapConfig.clusterMapDcsZkConnectStrings).get(localDc)
.getZkConnectStrs()
.get(0)).getConfigAccessor();
configAccessor.setResourceConfig(clusterMapConfig.clusterMapClusterName, resourceName, resourceConfig);
newPartitionId = String.valueOf(idealState.getNumPartitions() + 1001);
bootstrapReplica = helixClusterManager.getBootstrapReplica(newPartitionId, ambryDataNode);
assertNotNull(bootstrapReplica);
assertEquals(3 * DEFAULT_REPLICA_CAPACITY_IN_BYTES, bootstrapReplica.getCapacityInBytes());

helixClusterManager.close();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.stream.Collectors;
import org.apache.helix.AccessOption;
Expand Down Expand Up @@ -55,12 +56,15 @@
import org.apache.helix.healthcheck.ParticipantHealthReportCollector;
import org.apache.helix.model.HelixConfigScope;
import org.apache.helix.model.LiveInstance;
import org.apache.helix.model.ResourceConfig;
import org.apache.helix.participant.StateMachineEngine;
import org.apache.helix.spectator.RoutingTableProvider;
import org.apache.helix.store.HelixPropertyStore;
import org.apache.helix.store.zk.ZkHelixPropertyStore;
import org.apache.helix.zookeeper.datamodel.ZNRecord;

import static org.mockito.Mockito.*;


/**
* A mock implementation of the {@link HelixManager} to use in tests.
Expand All @@ -82,6 +86,8 @@ class MockHelixManager implements HelixManager {
private HelixPropertyStore<ZNRecord> helixPropertyStore;
private boolean isAggregatedViewCluster;
private final List<MockHelixAdmin> helixAdminList;
private final ConfigAccessor configAccessor;
private final Map<String, ResourceConfig> resourceConfigs = new ConcurrentHashMap<>();

/**
* Instantiate a MockHelixManager.
Expand Down Expand Up @@ -140,6 +146,18 @@ class MockHelixManager implements HelixManager {
}
}
this.isAggregatedViewCluster = isAggregatedViewCluster;

configAccessor = mock(ConfigAccessor.class);
doAnswer(invocation -> {
String resourceName = invocation.getArgument(1);
return resourceConfigs.get(resourceName);
}).when(configAccessor).getResourceConfig(anyString(), anyString());
doAnswer(invocation -> {
String resourceName = invocation.getArgument(1);
ResourceConfig resourceConfig = invocation.getArgument(2);
resourceConfigs.put(resourceName, resourceConfig);
return null;
}).when(configAccessor).setResourceConfig(anyString(), anyString(), any(ResourceConfig.class));
}

@Override
Expand Down Expand Up @@ -432,7 +450,7 @@ public HelixDataAccessor getHelixDataAccessor() {

@Override
public ConfigAccessor getConfigAccessor() {
throw new IllegalStateException("Not implemented");
return configAccessor;
}

@Override
Expand Down
Loading