Skip to content

Commit

Permalink
WIP: Add default replica capacity in resource config
Browse files Browse the repository at this point in the history
  • Loading branch information
justinlin-linkedin committed Aug 4, 2023
1 parent 8043f44 commit 48d2145
Show file tree
Hide file tree
Showing 3 changed files with 47 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ public class ClusterMapUtils {
static final String REPLICAS_STR_SEPARATOR = ":";
static final String REPLICAS_CAPACITY_STR = "replicaCapacityInBytes";
static final String REPLICA_TYPE_STR = "replicaType";
static final String DEFAULT_REPLICA_CAPACITY_STR = "defaultReplicaCapacityInBytes";
public static final String SSL_PORT_STR = "sslPort";
public static final String HTTP2_PORT_STR = "http2Port";
static final String RACKID_STR = "rackId";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1173,7 +1173,10 @@ private void setResourceConfig(String dcName, String resourceName, ConfigAccesso
clusterConfigFields.partitionDiskWeightInGB + PARTITION_BUFFER_CAPACITY_FOR_INDEX_FILES_IN_GB));

// 3. Update resource configs in helix
final long GB = 1024 * 1024 * 1024;
resourceConfig.setPartitionCapacityMap(partitionCapacityMap);
resourceConfig.putSimpleConfig(DEFAULT_REPLICA_CAPACITY_STR,
String.valueOf(GB * clusterConfigFields.partitionDiskWeightInGB));
configAccessor.setResourceConfig(clusterName, resourceName, resourceConfig);
info("Updated resource config/partition weights for resource {}. Partition weights: {}", resourceName,
resourceConfig.getPartitionCapacityMap());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import java.util.function.Consumer;
import java.util.stream.Collectors;
import org.apache.helix.AccessOption;
import org.apache.helix.ConfigAccessor;
import org.apache.helix.HelixAdmin;
import org.apache.helix.HelixManager;
import org.apache.helix.InstanceType;
Expand All @@ -50,6 +51,7 @@
import org.apache.helix.model.IdealState;
import org.apache.helix.model.InstanceConfig;
import org.apache.helix.model.LiveInstance;
import org.apache.helix.model.ResourceConfig;
import org.apache.helix.spectator.RoutingTableSnapshot;
import org.apache.helix.store.zk.ZkHelixPropertyStore;
import org.apache.helix.zookeeper.datamodel.ZNRecord;
Expand Down Expand Up @@ -114,6 +116,10 @@ public class HelixClusterManager implements ClusterMap {
final HelixClusterManagerMetrics helixClusterManagerMetrics;
private HelixAggregatedViewClusterInfo helixAggregatedViewClusterInfo = null;

// In FULL_AUTO mode, the resource configs. We probably only need to fetch one resource config, which this
// data node belongs to (This data node has to be a server node).
private final ConcurrentHashMap<String, ResourceConfig> resourceConfigs = new ConcurrentHashMap<>();

/**
* Instantiate a HelixClusterManager.
* @param clusterMapConfig the {@link ClusterMapConfig} associated with this manager.
Expand Down Expand Up @@ -785,6 +791,7 @@ private ReplicaId getBootstrapReplicaInSemiAuto(String partitionIdStr, DataNodeI
* @return {@link ReplicaId} if there is a new replica satisfying given partition and data node. {@code null} otherwise.
*/
private ReplicaId getBootstrapReplicaInFullAuto(String partitionIdStr, DataNodeId dataNodeId) {
long replicaCapacity = 0;
AmbryDisk disk = getDiskForBootstrapReplica((AmbryDataNode) dataNodeId);
if (disk == null) {
logger.error("No Disk is available to host bootstrap replica. Cannot create the replica.");
Expand All @@ -797,11 +804,17 @@ private ReplicaId getBootstrapReplicaInFullAuto(String partitionIdStr, DataNodeI
AmbryPartition currentPartition =
partitionNameToAmbryPartition.putIfAbsent(mappedPartition.toPathString(), mappedPartition);
if (currentPartition == null) {
// If the partition is new, we get the default replica capacity from resource config
logger.info("Partition {} is currently not present in cluster map, a new partition is created", partitionIdStr);
currentPartition = mappedPartition;
replicaCapacity = getReplicaCapacityFromResource(partitionIdStr, dataNodeId);
} else {
// For existing partitions, we should already have other replicas in the map
replicaCapacity = currentPartition.getReplicaIds().get(0).getCapacityInBytes();
}
disk.decreaseAvailableSpaceInBytes(replicaCapacity);
AmbryServerReplica replica =
new AmbryServerReplica(clusterMapConfig, currentPartition, disk, true, DEFAULT_REPLICA_CAPACITY_IN_BYTES,
new AmbryServerReplica(clusterMapConfig, currentPartition, disk, true, replicaCapacity,
ReplicaSealStatus.NOT_SEALED);
logger.info("Created bootstrap replica {} for Partition {}", replica, partitionIdStr);
return replica;
Expand All @@ -810,15 +823,42 @@ private ReplicaId getBootstrapReplicaInFullAuto(String partitionIdStr, DataNodeI
dataNodeId, e);
// We have decreased the available space on the disk since we thought that it will be used to host replica. Since
// bootstrapping replica failed, increase the available disk space back.
disk.increaseAvailableSpaceInBytes(DEFAULT_REPLICA_CAPACITY_IN_BYTES);
disk.increaseAvailableSpaceInBytes(replicaCapacity);
return null;
}
}

private long getReplicaCapacityFromResource(String partitionIdStr, DataNodeId dataNodeId) {
String instanceName = getInstanceName(dataNodeId.getHostname(), dataNodeId.getPort());
InstanceConfig instanceConfig = localHelixAdmin.getInstanceConfig(clusterName, instanceName);
if (instanceConfig == null) {
throw new IllegalArgumentException("Instance config for " + instanceName + " doesn't exist");
}
String dcName = dataNodeId.getDatacenterName();
List<String> tags = instanceConfig.getTags();
ConfigAccessor configAccessor = null;
if (clusterMapConfig.clusterMapUseAggregatedView) {
configAccessor = helixAggregatedViewClusterInfo.helixManager.getConfigAccessor();
} else {
configAccessor = ((HelixDcInfo) dcToDcInfo.get(dcName)).helixManager.getConfigAccessor();
}
String resourceName = partitionToResourceNameByDc.get(dcName).get(partitionIdStr);
if (!resourceConfigs.contains(resourceName)) {
ResourceConfig resourceConfig = configAccessor.getResourceConfig(clusterName, resourceName);
resourceConfigs.putIfAbsent(resourceName, resourceConfig);
}
ResourceConfig resourceConfig = resourceConfigs.get(resourceName);
String defaultReplicaCapacityStr = resourceConfig.getSimpleConfig(DEFAULT_REPLICA_CAPACITY_STR);
if (defaultReplicaCapacityStr == null) {
throw new IllegalArgumentException("Missing default replica capacity from resource " + resourceName
+ " when creating bootstrap replica for partition " + partitionIdStr);
}
return Long.parseLong(defaultReplicaCapacityStr);
}

/**
* Get a disk with maximum available space for bootstrapping replica in Full auto mode. This method is synchronized
* since it can be queried concurrently when multiple replicas are bootstrapped.
* TODO Check for disk health as well (Will be added in next PR)
* @param dataNode the {@link DataNodeId} on which disk is needed
* @return {@link AmbryDisk} which has maximum available or free capacity. If none of the disks have free space,
* returns null.
Expand Down

0 comments on commit 48d2145

Please sign in to comment.