Skip to content

Commit

Permalink
Add Setting to adjust the primary constraint weights
Browse files Browse the repository at this point in the history
Signed-off-by: Arpit Bandejiya <abandeji@amazon.com>
  • Loading branch information
Arpit-Bandejiya committed Oct 24, 2024
1 parent 119abaf commit c7d4c3a
Show file tree
Hide file tree
Showing 5 changed files with 48 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ public void updateAllocationConstraint(String constraint, boolean enable) {
this.constraints.get(constraint).setEnable(enable);
}

public long weight(ShardsBalancer balancer, BalancedShardsAllocator.ModelNode node, String index) {
Constraint.ConstraintParams params = new Constraint.ConstraintParams(balancer, node, index);
public long weight(ShardsBalancer balancer, BalancedShardsAllocator.ModelNode node, String index, long primaryThresholdWeight) {
Constraint.ConstraintParams params = new Constraint.ConstraintParams(balancer, node, index, primaryThresholdWeight);
return params.weight(constraints);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import java.util.function.Predicate;

import static org.opensearch.cluster.routing.allocation.ConstraintTypes.CONSTRAINT_WEIGHT;
import static org.opensearch.cluster.routing.allocation.ConstraintTypes.predicateKeyToWeightMap;

/**
* Defines a constraint useful to de-prioritize certain nodes as target of unassigned shards used in {@link AllocationConstraints} or
Expand Down Expand Up @@ -44,11 +45,13 @@ static class ConstraintParams {
private ShardsBalancer balancer;
private BalancedShardsAllocator.ModelNode node;
private String index;
private long PrimaryConstraintThreshold;

ConstraintParams(ShardsBalancer balancer, BalancedShardsAllocator.ModelNode node, String index) {
ConstraintParams(ShardsBalancer balancer, BalancedShardsAllocator.ModelNode node, String index, long primaryConstraintThreshold) {
this.balancer = balancer;
this.node = node;
this.index = index;
this.PrimaryConstraintThreshold = primaryConstraintThreshold;
}

public ShardsBalancer getBalancer() {
Expand All @@ -75,9 +78,12 @@ public String getIndex() {
*/
public long weight(Map<String, Constraint> constraints) {
long totalConstraintWeight = 0;
for (Constraint constraint : constraints.values()) {
for (Map.Entry<String, Constraint> entry : constraints.entrySet()) {
String key = entry.getKey();
Constraint constraint = entry.getValue();
if (constraint.test(this)) {
totalConstraintWeight += CONSTRAINT_WEIGHT;
double weight = predicateKeyToWeightMap(key, PrimaryConstraintThreshold);
totalConstraintWeight += weight;
}
}
return totalConstraintWeight;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,4 +86,13 @@ public static Predicate<Constraint.ConstraintParams> isPrimaryShardsPerNodeBreac
return primaryShardCount >= allowedPrimaryShardCount;
};
}

public static long predicateKeyToWeightMap(String key, long primaryConstraintWeight) {
switch (key) {
case CLUSTER_PRIMARY_SHARD_BALANCE_CONSTRAINT_ID:
case CLUSTER_PRIMARY_SHARD_REBALANCE_CONSTRAINT_ID:
return primaryConstraintWeight;
default: return CONSTRAINT_WEIGHT;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,8 @@ public void updateRebalanceConstraint(String constraint, boolean enable) {
this.constraints.get(constraint).setEnable(enable);
}

public long weight(ShardsBalancer balancer, BalancedShardsAllocator.ModelNode node, String index) {
Constraint.ConstraintParams params = new Constraint.ConstraintParams(balancer, node, index);
public long weight(ShardsBalancer balancer, BalancedShardsAllocator.ModelNode node, String index, long primaryConstraintThreshold) {
Constraint.ConstraintParams params = new Constraint.ConstraintParams(balancer, node, index, primaryConstraintThreshold);
return params.weight(constraints);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,14 @@ public class BalancedShardsAllocator implements ShardsAllocator {
Property.NodeScope
);

public static final Setting<Long> PRIMARY_CONSTRAINT_THRESHOLD_SETTING = Setting.longSetting(
"cluster.routing.allocation.primary_constraint.threshold",
10,
0,
Property.Dynamic,
Property.NodeScope
);

/**
* This setting governs whether primary shards balance is desired during allocation. This is used by {@link ConstraintTypes#isPerIndexPrimaryShardsPerNodeBreached()}
* and {@link ConstraintTypes#isPrimaryShardsPerNodeBreached} which is used during unassigned shard allocation
Expand Down Expand Up @@ -201,6 +209,7 @@ public class BalancedShardsAllocator implements ShardsAllocator {
private volatile float shardBalanceFactor;
private volatile WeightFunction weightFunction;
private volatile float threshold;
private volatile long primaryConstraintThreshold;

private volatile boolean ignoreThrottleInRestore;
private volatile TimeValue allocatorTimeout;
Expand All @@ -219,6 +228,7 @@ public BalancedShardsAllocator(Settings settings, ClusterSettings clusterSetting
setIgnoreThrottleInRestore(IGNORE_THROTTLE_FOR_REMOTE_RESTORE.get(settings));
updateWeightFunction();
setThreshold(THRESHOLD_SETTING.get(settings));
setPrimaryConstraintThresholdSetting(PRIMARY_CONSTRAINT_THRESHOLD_SETTING.get(settings));
setPreferPrimaryShardBalance(PREFER_PRIMARY_SHARD_BALANCE.get(settings));
setPreferPrimaryShardRebalance(PREFER_PRIMARY_SHARD_REBALANCE.get(settings));
setShardMovementStrategy(SHARD_MOVEMENT_STRATEGY_SETTING.get(settings));
Expand All @@ -231,6 +241,7 @@ public BalancedShardsAllocator(Settings settings, ClusterSettings clusterSetting
clusterSettings.addSettingsUpdateConsumer(PRIMARY_SHARD_REBALANCE_BUFFER, this::updatePreferPrimaryShardBalanceBuffer);
clusterSettings.addSettingsUpdateConsumer(PREFER_PRIMARY_SHARD_REBALANCE, this::setPreferPrimaryShardRebalance);
clusterSettings.addSettingsUpdateConsumer(THRESHOLD_SETTING, this::setThreshold);
clusterSettings.addSettingsUpdateConsumer(PRIMARY_CONSTRAINT_THRESHOLD_SETTING, this::setPrimaryConstraintThresholdSetting);
clusterSettings.addSettingsUpdateConsumer(IGNORE_THROTTLE_FOR_REMOTE_RESTORE, this::setIgnoreThrottleInRestore);
clusterSettings.addSettingsUpdateConsumer(ALLOCATOR_TIMEOUT_SETTING, this::setAllocatorTimeout);
}
Expand Down Expand Up @@ -294,7 +305,7 @@ private void updatePreferPrimaryShardBalanceBuffer(float preferPrimaryShardBalan
}

private void updateWeightFunction() {
weightFunction = new WeightFunction(this.indexBalanceFactor, this.shardBalanceFactor, this.preferPrimaryShardRebalanceBuffer);
weightFunction = new WeightFunction(this.indexBalanceFactor, this.shardBalanceFactor, this.preferPrimaryShardRebalanceBuffer, this.primaryConstraintThreshold);
}

/**
Expand All @@ -317,6 +328,11 @@ private void setThreshold(float threshold) {
this.threshold = threshold;
}

private void setPrimaryConstraintThresholdSetting(long threshold) {
this.primaryConstraintThreshold = threshold;
this.weightFunction.updatePrimaryConstraintThreshold(threshold);
}

private void setAllocatorTimeout(TimeValue allocatorTimeout) {
this.allocatorTimeout = allocatorTimeout;
}
Expand Down Expand Up @@ -489,10 +505,11 @@ static class WeightFunction {
private final float shardBalance;
private final float theta0;
private final float theta1;
private long primaryConstraintThreshold;
private AllocationConstraints constraints;
private RebalanceConstraints rebalanceConstraints;

WeightFunction(float indexBalance, float shardBalance, float preferPrimaryBalanceBuffer) {
WeightFunction(float indexBalance, float shardBalance, float preferPrimaryBalanceBuffer, long primaryConstraintThreshold) {
float sum = indexBalance + shardBalance;
if (sum <= 0.0f) {
throw new IllegalArgumentException("Balance factors must sum to a value > 0 but was: " + sum);
Expand All @@ -501,6 +518,7 @@ static class WeightFunction {
theta1 = indexBalance / sum;
this.indexBalance = indexBalance;
this.shardBalance = shardBalance;
this.primaryConstraintThreshold = primaryConstraintThreshold;
RebalanceParameter rebalanceParameter = new RebalanceParameter(preferPrimaryBalanceBuffer);
this.constraints = new AllocationConstraints();
this.rebalanceConstraints = new RebalanceConstraints(rebalanceParameter);
Expand All @@ -510,12 +528,12 @@ static class WeightFunction {

public float weightWithAllocationConstraints(ShardsBalancer balancer, ModelNode node, String index) {
float balancerWeight = weight(balancer, node, index);
return balancerWeight + constraints.weight(balancer, node, index);
return balancerWeight + constraints.weight(balancer, node, index, primaryConstraintThreshold);
}

public float weightWithRebalanceConstraints(ShardsBalancer balancer, ModelNode node, String index) {
float balancerWeight = weight(balancer, node, index);
return balancerWeight + rebalanceConstraints.weight(balancer, node, index);
return balancerWeight + rebalanceConstraints.weight(balancer, node, index, primaryConstraintThreshold);
}

float weight(ShardsBalancer balancer, ModelNode node, String index) {
Expand All @@ -531,6 +549,10 @@ void updateAllocationConstraint(String constraint, boolean enable) {
void updateRebalanceConstraint(String constraint, boolean add) {
this.rebalanceConstraints.updateRebalanceConstraint(constraint, add);
}

void updatePrimaryConstraintThreshold(long primaryConstraintThreshold) {
this.primaryConstraintThreshold = primaryConstraintThreshold;
}
}

/**
Expand Down

0 comments on commit c7d4c3a

Please sign in to comment.