Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add default replica capacity in resource config #2531

Merged
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ public class ClusterMapUtils {
static final String REPLICAS_STR_SEPARATOR = ":";
static final String REPLICAS_CAPACITY_STR = "replicaCapacityInBytes";
static final String REPLICA_TYPE_STR = "replicaType";
static final String DEFAULT_REPLICA_CAPACITY_STR = "defaultReplicaCapacityInBytes";
public static final String SSL_PORT_STR = "sslPort";
public static final String HTTP2_PORT_STR = "http2Port";
static final String RACKID_STR = "rackId";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1173,7 +1173,10 @@ private void setResourceConfig(String dcName, String resourceName, ConfigAccesso
clusterConfigFields.partitionDiskWeightInGB + PARTITION_BUFFER_CAPACITY_FOR_INDEX_FILES_IN_GB));

// 3. Update resource configs in helix
final long GB = 1024 * 1024 * 1024;
resourceConfig.setPartitionCapacityMap(partitionCapacityMap);
resourceConfig.putSimpleConfig(DEFAULT_REPLICA_CAPACITY_STR,
String.valueOf(GB * clusterConfigFields.partitionDiskWeightInGB));
configAccessor.setResourceConfig(clusterName, resourceName, resourceConfig);
info("Updated resource config/partition weights for resource {}. Partition weights: {}", resourceName,
resourceConfig.getPartitionCapacityMap());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import java.util.function.Consumer;
import java.util.stream.Collectors;
import org.apache.helix.AccessOption;
import org.apache.helix.ConfigAccessor;
import org.apache.helix.HelixAdmin;
import org.apache.helix.HelixManager;
import org.apache.helix.InstanceType;
Expand All @@ -50,6 +51,7 @@
import org.apache.helix.model.IdealState;
import org.apache.helix.model.InstanceConfig;
import org.apache.helix.model.LiveInstance;
import org.apache.helix.model.ResourceConfig;
import org.apache.helix.spectator.RoutingTableSnapshot;
import org.apache.helix.store.zk.ZkHelixPropertyStore;
import org.apache.helix.zookeeper.datamodel.ZNRecord;
Expand Down Expand Up @@ -114,6 +116,10 @@ public class HelixClusterManager implements ClusterMap {
final HelixClusterManagerMetrics helixClusterManagerMetrics;
private HelixAggregatedViewClusterInfo helixAggregatedViewClusterInfo = null;

// In FULL_AUTO mode, the resource configs. We probably only need to fetch one resource config, which this
// data node belongs to (This data node has to be a server node).
private final ConcurrentHashMap<String, ResourceConfig> resourceConfigs = new ConcurrentHashMap<>();

/**
* Instantiate a HelixClusterManager.
* @param clusterMapConfig the {@link ClusterMapConfig} associated with this manager.
Expand Down Expand Up @@ -785,27 +791,27 @@ private ReplicaId getBootstrapReplicaInSemiAuto(String partitionIdStr, DataNodeI
* @return {@link ReplicaId} if there is a new replica satisfying given partition and data node. {@code null} otherwise.
*/
private ReplicaId getBootstrapReplicaInFullAuto(String partitionIdStr, DataNodeId dataNodeId) {
long replicaCapacity = 0;
AmbryDisk disk = getDiskForBootstrapReplica((AmbryDataNode) dataNodeId);
if (disk == null) {
logger.error("No Disk is available to host bootstrap replica. Cannot create the replica.");
return null;
}
long replicaCapacity = 0;
try {
AmbryPartition mappedPartition =
new AmbryPartition(Long.parseLong(partitionIdStr), clusterMapConfig.clusterMapDefaultPartitionClass,
helixClusterManagerQueryHelper);
AmbryPartition currentPartition =
partitionNameToAmbryPartition.putIfAbsent(mappedPartition.toPathString(), mappedPartition);
if (currentPartition == null) {
// If the partition is new, we get the default replica capacity from resource config
logger.info("Partition {} is currently not present in cluster map, a new partition is created", partitionIdStr);
currentPartition = mappedPartition;
replicaCapacity = DEFAULT_REPLICA_CAPACITY_IN_BYTES;
replicaCapacity = getReplicaCapacityFromResourceConfig(partitionIdStr, dataNodeId);
} else {
// For existing partitions, we should already have other replicas in the map
replicaCapacity = currentPartition.getReplicaIds().get(0).getCapacityInBytes();
}
// Update disk usage
// TODO: Add a metric for this
disk.decreaseAvailableSpaceInBytes(replicaCapacity);
AmbryServerReplica replica =
new AmbryServerReplica(clusterMapConfig, currentPartition, disk, true, replicaCapacity,
Expand All @@ -822,10 +828,48 @@ private ReplicaId getBootstrapReplicaInFullAuto(String partitionIdStr, DataNodeI
}
}

/**
* Get default replica capacity from resource config. A data node belongs to only one resource, if we are creating a
* new partition for this data node, we can get the default capace from this resource's config.
* @param partitionIdStr The partition id in string.
* @param dataNodeId The data node to create a new replica in.
* @return The default replica capacity
*/
private long getReplicaCapacityFromResourceConfig(String partitionIdStr, DataNodeId dataNodeId) {
String instanceName = getInstanceName(dataNodeId.getHostname(), dataNodeId.getPort());
InstanceConfig instanceConfig = localHelixAdmin.getInstanceConfig(clusterName, instanceName);
if (instanceConfig == null) {
throw new IllegalArgumentException("Instance config for " + instanceName + " doesn't exist");
}
String dcName = dataNodeId.getDatacenterName();
List<String> tags = instanceConfig.getTags();
// we should only have one tag
String tag = tags.get(0);
String resourceName = dcToTagToResourceProperty.get(dcName).get(tag).name;
if (!resourceConfigs.contains(resourceName)) {
ConfigAccessor configAccessor = null;
if (clusterMapConfig.clusterMapUseAggregatedView) {
configAccessor = helixAggregatedViewClusterInfo.helixManager.getConfigAccessor();
} else {
configAccessor = ((HelixDcInfo) dcToDcInfo.get(dcName)).helixManager.getConfigAccessor();
}
logger.info("Fetching resource config for {} to create bootstrap replica for partition {}", resourceName,
partitionIdStr);
ResourceConfig resourceConfig = configAccessor.getResourceConfig(clusterName, resourceName);
resourceConfigs.putIfAbsent(resourceName, resourceConfig);
}
ResourceConfig resourceConfig = resourceConfigs.get(resourceName);
String defaultReplicaCapacityStr = resourceConfig.getSimpleConfig(DEFAULT_REPLICA_CAPACITY_STR);
if (defaultReplicaCapacityStr == null) {
throw new IllegalArgumentException("Missing default replica capacity from resource " + resourceName
+ " when creating bootstrap replica for partition " + partitionIdStr);
}
return Long.parseLong(defaultReplicaCapacityStr);
}

/**
* Get a disk with maximum available space for bootstrapping replica in Full auto mode. This method is synchronized
* since it can be queried concurrently when multiple replicas are bootstrapped.
* TODO Check for disk health as well (Will be added in next PR)
* @param dataNode the {@link DataNodeId} on which disk is needed
* @return {@link AmbryDisk} which has maximum available or free capacity. If none of the disks have free space,
* returns null.
Expand Down
Loading