From 48d214568c3565113e74686c43c4a4e146dfac63 Mon Sep 17 00:00:00 2001 From: Justin Lin Date: Fri, 4 Aug 2023 11:41:11 -0700 Subject: [PATCH] WIP: Add default replica capacity in resource config --- .../ambry/clustermap/ClusterMapUtils.java | 1 + .../clustermap/HelixBootstrapUpgradeUtil.java | 3 ++ .../ambry/clustermap/HelixClusterManager.java | 46 +++++++++++++++++-- 3 files changed, 47 insertions(+), 3 deletions(-) diff --git a/ambry-clustermap/src/main/java/com/github/ambry/clustermap/ClusterMapUtils.java b/ambry-clustermap/src/main/java/com/github/ambry/clustermap/ClusterMapUtils.java index d7c6c972d2..c364629d60 100644 --- a/ambry-clustermap/src/main/java/com/github/ambry/clustermap/ClusterMapUtils.java +++ b/ambry-clustermap/src/main/java/com/github/ambry/clustermap/ClusterMapUtils.java @@ -78,6 +78,7 @@ public class ClusterMapUtils { static final String REPLICAS_STR_SEPARATOR = ":"; static final String REPLICAS_CAPACITY_STR = "replicaCapacityInBytes"; static final String REPLICA_TYPE_STR = "replicaType"; + static final String DEFAULT_REPLICA_CAPACITY_STR = "defaultReplicaCapacityInBytes"; public static final String SSL_PORT_STR = "sslPort"; public static final String HTTP2_PORT_STR = "http2Port"; static final String RACKID_STR = "rackId"; diff --git a/ambry-clustermap/src/main/java/com/github/ambry/clustermap/HelixBootstrapUpgradeUtil.java b/ambry-clustermap/src/main/java/com/github/ambry/clustermap/HelixBootstrapUpgradeUtil.java index 1df72266e6..8516d20b46 100644 --- a/ambry-clustermap/src/main/java/com/github/ambry/clustermap/HelixBootstrapUpgradeUtil.java +++ b/ambry-clustermap/src/main/java/com/github/ambry/clustermap/HelixBootstrapUpgradeUtil.java @@ -1173,7 +1173,10 @@ private void setResourceConfig(String dcName, String resourceName, ConfigAccesso clusterConfigFields.partitionDiskWeightInGB + PARTITION_BUFFER_CAPACITY_FOR_INDEX_FILES_IN_GB)); // 3. Update resource configs in helix + final long GB = 1024 * 1024 * 1024; resourceConfig.setPartitionCapacityMap(partitionCapacityMap); + resourceConfig.putSimpleConfig(DEFAULT_REPLICA_CAPACITY_STR, + String.valueOf(GB * clusterConfigFields.partitionDiskWeightInGB)); configAccessor.setResourceConfig(clusterName, resourceName, resourceConfig); info("Updated resource config/partition weights for resource {}. Partition weights: {}", resourceName, resourceConfig.getPartitionCapacityMap()); diff --git a/ambry-clustermap/src/main/java/com/github/ambry/clustermap/HelixClusterManager.java b/ambry-clustermap/src/main/java/com/github/ambry/clustermap/HelixClusterManager.java index 80ed230911..104a087183 100644 --- a/ambry-clustermap/src/main/java/com/github/ambry/clustermap/HelixClusterManager.java +++ b/ambry-clustermap/src/main/java/com/github/ambry/clustermap/HelixClusterManager.java @@ -40,6 +40,7 @@ import java.util.function.Consumer; import java.util.stream.Collectors; import org.apache.helix.AccessOption; +import org.apache.helix.ConfigAccessor; import org.apache.helix.HelixAdmin; import org.apache.helix.HelixManager; import org.apache.helix.InstanceType; @@ -50,6 +51,7 @@ import org.apache.helix.model.IdealState; import org.apache.helix.model.InstanceConfig; import org.apache.helix.model.LiveInstance; +import org.apache.helix.model.ResourceConfig; import org.apache.helix.spectator.RoutingTableSnapshot; import org.apache.helix.store.zk.ZkHelixPropertyStore; import org.apache.helix.zookeeper.datamodel.ZNRecord; @@ -114,6 +116,10 @@ public class HelixClusterManager implements ClusterMap { final HelixClusterManagerMetrics helixClusterManagerMetrics; private HelixAggregatedViewClusterInfo helixAggregatedViewClusterInfo = null; + // In FULL_AUTO mode, the resource configs. We probably only need to fetch one resource config, which this + // data node belongs to (This data node has to be a server node). + private final ConcurrentHashMap resourceConfigs = new ConcurrentHashMap<>(); + /** * Instantiate a HelixClusterManager. * @param clusterMapConfig the {@link ClusterMapConfig} associated with this manager. @@ -785,6 +791,7 @@ private ReplicaId getBootstrapReplicaInSemiAuto(String partitionIdStr, DataNodeI * @return {@link ReplicaId} if there is a new replica satisfying given partition and data node. {@code null} otherwise. */ private ReplicaId getBootstrapReplicaInFullAuto(String partitionIdStr, DataNodeId dataNodeId) { + long replicaCapacity = 0; AmbryDisk disk = getDiskForBootstrapReplica((AmbryDataNode) dataNodeId); if (disk == null) { logger.error("No Disk is available to host bootstrap replica. Cannot create the replica."); @@ -797,11 +804,17 @@ private ReplicaId getBootstrapReplicaInFullAuto(String partitionIdStr, DataNodeI AmbryPartition currentPartition = partitionNameToAmbryPartition.putIfAbsent(mappedPartition.toPathString(), mappedPartition); if (currentPartition == null) { + // If the partition is new, we get the default replica capacity from resource config logger.info("Partition {} is currently not present in cluster map, a new partition is created", partitionIdStr); currentPartition = mappedPartition; + replicaCapacity = getReplicaCapacityFromResource(partitionIdStr, dataNodeId); + } else { + // For existing partitions, we should already have other replicas in the map + replicaCapacity = currentPartition.getReplicaIds().get(0).getCapacityInBytes(); } + disk.decreaseAvailableSpaceInBytes(replicaCapacity); AmbryServerReplica replica = - new AmbryServerReplica(clusterMapConfig, currentPartition, disk, true, DEFAULT_REPLICA_CAPACITY_IN_BYTES, + new AmbryServerReplica(clusterMapConfig, currentPartition, disk, true, replicaCapacity, ReplicaSealStatus.NOT_SEALED); logger.info("Created bootstrap replica {} for Partition {}", replica, partitionIdStr); return replica; @@ -810,15 +823,42 @@ private ReplicaId getBootstrapReplicaInFullAuto(String partitionIdStr, DataNodeI dataNodeId, e); // We have decreased the available space on the disk since we thought that it will be used to host replica. Since // bootstrapping replica failed, increase the available disk space back. - disk.increaseAvailableSpaceInBytes(DEFAULT_REPLICA_CAPACITY_IN_BYTES); + disk.increaseAvailableSpaceInBytes(replicaCapacity); return null; } } + private long getReplicaCapacityFromResource(String partitionIdStr, DataNodeId dataNodeId) { + String instanceName = getInstanceName(dataNodeId.getHostname(), dataNodeId.getPort()); + InstanceConfig instanceConfig = localHelixAdmin.getInstanceConfig(clusterName, instanceName); + if (instanceConfig == null) { + throw new IllegalArgumentException("Instance config for " + instanceName + " doesn't exist"); + } + String dcName = dataNodeId.getDatacenterName(); + List tags = instanceConfig.getTags(); + ConfigAccessor configAccessor = null; + if (clusterMapConfig.clusterMapUseAggregatedView) { + configAccessor = helixAggregatedViewClusterInfo.helixManager.getConfigAccessor(); + } else { + configAccessor = ((HelixDcInfo) dcToDcInfo.get(dcName)).helixManager.getConfigAccessor(); + } + String resourceName = partitionToResourceNameByDc.get(dcName).get(partitionIdStr); + if (!resourceConfigs.contains(resourceName)) { + ResourceConfig resourceConfig = configAccessor.getResourceConfig(clusterName, resourceName); + resourceConfigs.putIfAbsent(resourceName, resourceConfig); + } + ResourceConfig resourceConfig = resourceConfigs.get(resourceName); + String defaultReplicaCapacityStr = resourceConfig.getSimpleConfig(DEFAULT_REPLICA_CAPACITY_STR); + if (defaultReplicaCapacityStr == null) { + throw new IllegalArgumentException("Missing default replica capacity from resource " + resourceName + + " when creating bootstrap replica for partition " + partitionIdStr); + } + return Long.parseLong(defaultReplicaCapacityStr); + } + /** * Get a disk with maximum available space for bootstrapping replica in Full auto mode. This method is synchronized * since it can be queried concurrently when multiple replicas are bootstrapped. - * TODO Check for disk health as well (Will be added in next PR) * @param dataNode the {@link DataNodeId} on which disk is needed * @return {@link AmbryDisk} which has maximum available or free capacity. If none of the disks have free space, * returns null.