diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java index fa6853191f9..cb6ee99c4fc 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java @@ -30,10 +30,10 @@ import org.apache.commons.lang3.StringUtils; import org.apache.helix.controller.rebalancer.strategy.AutoRebalanceStrategy; import org.apache.pinot.common.protocols.SegmentCompletionProtocol; +import org.apache.pinot.controller.helix.core.rebalance.RebalanceConfig; import org.apache.pinot.spi.env.PinotConfiguration; import org.apache.pinot.spi.filesystem.LocalPinotFS; import org.apache.pinot.spi.utils.CommonConstants; -import org.apache.pinot.spi.utils.RebalanceConfigConstants; import org.apache.pinot.spi.utils.TimeUtils; import static org.apache.pinot.spi.utils.CommonConstants.Controller.CONFIG_OF_CONTROLLER_METRICS_PREFIX; @@ -962,12 +962,12 @@ public boolean enableSegmentRelocatorLocalTierMigration() { public long getSegmentRelocatorExternalViewCheckIntervalInMs() { return getProperty(ControllerPeriodicTasksConf.SEGMENT_RELOCATOR_EXTERNAL_VIEW_CHECK_INTERVAL_IN_MS, - RebalanceConfigConstants.DEFAULT_EXTERNAL_VIEW_CHECK_INTERVAL_IN_MS); + RebalanceConfig.DEFAULT_EXTERNAL_VIEW_CHECK_INTERVAL_IN_MS); } public long getSegmentRelocatorExternalViewStabilizationTimeoutInMs() { return getProperty(ControllerPeriodicTasksConf.SEGMENT_RELOCATOR_EXTERNAL_VIEW_STABILIZATION_TIMEOUT_IN_MS, - RebalanceConfigConstants.DEFAULT_EXTERNAL_VIEW_STABILIZATION_TIMEOUT_IN_MS); + RebalanceConfig.DEFAULT_EXTERNAL_VIEW_STABILIZATION_TIMEOUT_IN_MS); } public boolean isSegmentRelocatorRebalanceTablesSequentially() { diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java index b4dda6a56a0..9cc7020a1d1 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java @@ -64,8 +64,6 @@ import javax.ws.rs.core.HttpHeaders; import javax.ws.rs.core.MediaType; import javax.ws.rs.core.Response; -import org.apache.commons.configuration.BaseConfiguration; -import org.apache.commons.configuration.Configuration; import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.tuple.Pair; import org.apache.helix.AccessOption; @@ -94,6 +92,8 @@ import org.apache.pinot.controller.helix.core.PinotHelixResourceManager; import org.apache.pinot.controller.helix.core.PinotResourceManagerResponse; import org.apache.pinot.controller.helix.core.minion.PinotHelixTaskResourceManager; +import org.apache.pinot.controller.helix.core.rebalance.RebalanceConfig; +import org.apache.pinot.controller.helix.core.rebalance.RebalanceJobConstants; import org.apache.pinot.controller.helix.core.rebalance.RebalanceResult; import org.apache.pinot.controller.helix.core.rebalance.TableRebalanceProgressStats; import org.apache.pinot.controller.helix.core.rebalance.TableRebalancer; @@ -114,7 +114,6 @@ import org.apache.pinot.spi.data.Schema; import org.apache.pinot.spi.utils.CommonConstants; import org.apache.pinot.spi.utils.JsonUtils; -import org.apache.pinot.spi.utils.RebalanceConfigConstants; import org.apache.pinot.spi.utils.builder.TableNameBuilder; import org.apache.pinot.spi.utils.retry.RetryPolicies; import org.apache.zookeeper.data.Stat; @@ -621,20 +620,18 @@ public RebalanceResult rebalance( String tableNameWithType = constructTableNameWithType(tableName, tableTypeStr); - Configuration rebalanceConfig = new BaseConfiguration(); - rebalanceConfig.addProperty(RebalanceConfigConstants.DRY_RUN, dryRun); - rebalanceConfig.addProperty(RebalanceConfigConstants.REASSIGN_INSTANCES, reassignInstances); - rebalanceConfig.addProperty(RebalanceConfigConstants.INCLUDE_CONSUMING, includeConsuming); - rebalanceConfig.addProperty(RebalanceConfigConstants.BOOTSTRAP, bootstrap); - rebalanceConfig.addProperty(RebalanceConfigConstants.DOWNTIME, downtime); - rebalanceConfig.addProperty(RebalanceConfigConstants.MIN_REPLICAS_TO_KEEP_UP_FOR_NO_DOWNTIME, minAvailableReplicas); - rebalanceConfig.addProperty(RebalanceConfigConstants.BEST_EFFORTS, bestEfforts); - rebalanceConfig.addProperty(RebalanceConfigConstants.EXTERNAL_VIEW_CHECK_INTERVAL_IN_MS, - externalViewCheckIntervalInMs); - rebalanceConfig.addProperty(RebalanceConfigConstants.EXTERNAL_VIEW_STABILIZATION_TIMEOUT_IN_MS, - externalViewStabilizationTimeoutInMs); - rebalanceConfig.addProperty(RebalanceConfigConstants.UPDATE_TARGET_TIER, updateTargetTier); - rebalanceConfig.addProperty(RebalanceConfigConstants.JOB_ID, TableRebalancer.createUniqueRebalanceJobIdentifier()); + RebalanceConfig rebalanceConfig = new RebalanceConfig(); + rebalanceConfig.setDryRun(dryRun); + rebalanceConfig.setReassignInstances(reassignInstances); + rebalanceConfig.setIncludeConsuming(includeConsuming); + rebalanceConfig.setBootstrap(bootstrap); + rebalanceConfig.setDowntime(downtime); + rebalanceConfig.setMinAvailableReplicas(minAvailableReplicas); + rebalanceConfig.setBestEfforts(bestEfforts); + rebalanceConfig.setExternalViewCheckIntervalInMs(externalViewCheckIntervalInMs); + rebalanceConfig.setExternalViewStabilizationTimeoutInMs(externalViewStabilizationTimeoutInMs); + rebalanceConfig.setUpdateTargetTier(updateTargetTier); + rebalanceConfig.setJobId(TableRebalancer.createUniqueRebalanceJobIdentifier()); try { if (dryRun || downtime) { @@ -642,13 +639,13 @@ public RebalanceResult rebalance( return _pinotHelixResourceManager.rebalanceTable(tableNameWithType, rebalanceConfig, false); } else { // Make a dry-run first to get the target assignment - rebalanceConfig.setProperty(RebalanceConfigConstants.DRY_RUN, true); + rebalanceConfig.setDryRun(true); RebalanceResult dryRunResult = _pinotHelixResourceManager.rebalanceTable(tableNameWithType, rebalanceConfig, false); if (dryRunResult.getStatus() == RebalanceResult.Status.DONE) { // If dry-run succeeded, run rebalance asynchronously - rebalanceConfig.setProperty(RebalanceConfigConstants.DRY_RUN, false); + rebalanceConfig.setDryRun(false); _executorService.submit(() -> { try { _pinotHelixResourceManager.rebalanceTable(tableNameWithType, rebalanceConfig, true); @@ -744,13 +741,12 @@ public ServerRebalanceJobStatusResponse rebalanceStatus( throw new ControllerApplicationException(LOGGER, "Failed to find controller job id: " + jobId, Response.Status.NOT_FOUND); } - TableRebalanceProgressStats tableRebalanceProgressStats = - JsonUtils.stringToObject(controllerJobZKMetadata.get(RebalanceConfigConstants.REBALANCE_PROGRESS_STATS), - TableRebalanceProgressStats.class); + TableRebalanceProgressStats tableRebalanceProgressStats = JsonUtils.stringToObject( + controllerJobZKMetadata.get(RebalanceJobConstants.JOB_METADATA_KEY_REBALANCE_PROGRESS_STATS), + TableRebalanceProgressStats.class); long timeSinceStartInSecs = 0L; - if (!tableRebalanceProgressStats.getStatus().equals(RebalanceResult.Status.DONE)) { - timeSinceStartInSecs = - (System.currentTimeMillis() - tableRebalanceProgressStats.getStartTimeMs()) / 1000; + if (!RebalanceResult.Status.DONE.toString().equals(tableRebalanceProgressStats.getStatus())) { + timeSinceStartInSecs = (System.currentTimeMillis() - tableRebalanceProgressStats.getStartTimeMs()) / 1000; } ServerRebalanceJobStatusResponse serverRebalanceJobStatusResponse = new ServerRebalanceJobStatusResponse(); diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTenantRestletResource.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTenantRestletResource.java index f160bba80d3..3aa5da48e91 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTenantRestletResource.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTenantRestletResource.java @@ -58,7 +58,8 @@ import org.apache.pinot.controller.api.exception.ControllerApplicationException; import org.apache.pinot.controller.helix.core.PinotHelixResourceManager; import org.apache.pinot.controller.helix.core.PinotResourceManagerResponse; -import org.apache.pinot.controller.helix.core.rebalance.tenant.TenantRebalanceContext; +import org.apache.pinot.controller.helix.core.rebalance.RebalanceJobConstants; +import org.apache.pinot.controller.helix.core.rebalance.tenant.TenantRebalanceConfig; import org.apache.pinot.controller.helix.core.rebalance.tenant.TenantRebalanceProgressStats; import org.apache.pinot.controller.helix.core.rebalance.tenant.TenantRebalanceResult; import org.apache.pinot.controller.helix.core.rebalance.tenant.TenantRebalancer; @@ -70,7 +71,6 @@ import org.apache.pinot.spi.config.tenant.Tenant; import org.apache.pinot.spi.config.tenant.TenantRole; import org.apache.pinot.spi.utils.JsonUtils; -import org.apache.pinot.spi.utils.RebalanceConfigConstants; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -584,9 +584,9 @@ public SuccessResponse deleteTenant( @ApiOperation(value = "Rebalances all the tables that are part of the tenant") public TenantRebalanceResult rebalance( @ApiParam(value = "Name of the tenant whose table are to be rebalanced", required = true) - @PathParam("tenantName") String tenantName, @ApiParam(required = true) TenantRebalanceContext context) { - context.setTenantName(tenantName); - return _tenantRebalancer.rebalance(context); + @PathParam("tenantName") String tenantName, @ApiParam(required = true) TenantRebalanceConfig config) { + config.setTenantName(tenantName); + return _tenantRebalancer.rebalance(config); } @GET @@ -606,9 +606,9 @@ public TenantRebalanceJobStatusResponse rebalanceStatus( throw new ControllerApplicationException(LOGGER, "Failed to find controller job id: " + jobId, Response.Status.NOT_FOUND); } - TenantRebalanceProgressStats tenantRebalanceProgressStats = - JsonUtils.stringToObject(controllerJobZKMetadata.get(RebalanceConfigConstants.REBALANCE_PROGRESS_STATS), - TenantRebalanceProgressStats.class); + TenantRebalanceProgressStats tenantRebalanceProgressStats = JsonUtils.stringToObject( + controllerJobZKMetadata.get(RebalanceJobConstants.JOB_METADATA_KEY_REBALANCE_PROGRESS_STATS), + TenantRebalanceProgressStats.class); long timeSinceStartInSecs = tenantRebalanceProgressStats.getTimeToFinishInSeconds(); if (tenantRebalanceProgressStats.getCompletionStatusMsg() == null) { timeSinceStartInSecs = diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java index fe54d3cb7bc..d73753ef1c8 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java @@ -56,7 +56,6 @@ import javax.ws.rs.NotFoundException; import javax.ws.rs.core.Response; import org.apache.commons.collections.CollectionUtils; -import org.apache.commons.configuration.Configuration; import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.tuple.Pair; import org.apache.helix.AccessOption; @@ -144,6 +143,7 @@ import org.apache.pinot.controller.helix.core.lineage.LineageManager; import org.apache.pinot.controller.helix.core.lineage.LineageManagerFactory; import org.apache.pinot.controller.helix.core.realtime.PinotLLCRealtimeSegmentManager; +import org.apache.pinot.controller.helix.core.rebalance.RebalanceConfig; import org.apache.pinot.controller.helix.core.rebalance.RebalanceResult; import org.apache.pinot.controller.helix.core.rebalance.TableRebalancer; import org.apache.pinot.controller.helix.core.rebalance.ZkBasedTableRebalanceObserver; @@ -175,7 +175,6 @@ import org.apache.pinot.spi.utils.IngestionConfigUtils; import org.apache.pinot.spi.utils.InstanceTypeUtils; import org.apache.pinot.spi.utils.JsonUtils; -import org.apache.pinot.spi.utils.RebalanceConfigConstants; import org.apache.pinot.spi.utils.TimeUtils; import org.apache.pinot.spi.utils.builder.TableNameBuilder; import org.apache.pinot.spi.utils.retry.RetryPolicies; @@ -3089,21 +3088,20 @@ private PinotResourceManagerResponse enableInstance(String instanceName, boolean * Entry point for table Rebalacing. * @param tableNameWithType * @param rebalanceConfig - * @param trackRebalanceProgress - Do we want to track rebalance progress stats + * @param trackRebalanceProgress whether to track rebalance progress stats * @return RebalanceResult * @throws TableNotFoundException */ - public RebalanceResult rebalanceTable(String tableNameWithType, Configuration rebalanceConfig, + public RebalanceResult rebalanceTable(String tableNameWithType, RebalanceConfig rebalanceConfig, boolean trackRebalanceProgress) throws TableNotFoundException { TableConfig tableConfig = getTableConfig(tableNameWithType); if (tableConfig == null) { throw new TableNotFoundException("Failed to find table config for table: " + tableNameWithType); } - String rebalanceJobId = rebalanceConfig.getString(RebalanceConfigConstants.JOB_ID); - Preconditions.checkState(rebalanceJobId != null, "RebalanceId not populated in the rebalanceConfig "); - if (rebalanceConfig.getBoolean(RebalanceConfigConstants.UPDATE_TARGET_TIER, - RebalanceConfigConstants.DEFAULT_UPDATE_TARGET_TIER)) { + String rebalanceJobId = rebalanceConfig.getJobId(); + Preconditions.checkState(rebalanceJobId != null, "RebalanceId not populated in the rebalanceConfig"); + if (rebalanceConfig.isUpdateTargetTier()) { updateTargetTier(rebalanceJobId, tableNameWithType, tableConfig); } ZkBasedTableRebalanceObserver zkBasedTableRebalanceObserver = null; diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/OfflineSegmentAssignment.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/OfflineSegmentAssignment.java index 36f784515ab..05eb7d40488 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/OfflineSegmentAssignment.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/OfflineSegmentAssignment.java @@ -23,15 +23,14 @@ import java.util.Map; import javax.annotation.Nullable; import org.apache.commons.collections.CollectionUtils; -import org.apache.commons.configuration.Configuration; import org.apache.commons.lang3.tuple.Pair; import org.apache.pinot.common.assignment.InstancePartitions; import org.apache.pinot.common.tier.Tier; import org.apache.pinot.controller.helix.core.assignment.segment.strategy.AllServersSegmentAssignmentStrategy; import org.apache.pinot.controller.helix.core.assignment.segment.strategy.SegmentAssignmentStrategy; import org.apache.pinot.controller.helix.core.assignment.segment.strategy.SegmentAssignmentStrategyFactory; +import org.apache.pinot.controller.helix.core.rebalance.RebalanceConfig; import org.apache.pinot.spi.config.table.assignment.InstancePartitionsType; -import org.apache.pinot.spi.utils.RebalanceConfigConstants; /** @@ -61,7 +60,7 @@ public List assignSegment(String segmentName, Map> rebalanceTable(Map> currentAssignment, Map instancePartitionsMap, @Nullable List sortedTiers, - @Nullable Map tierInstancePartitionsMap, Configuration config) { + @Nullable Map tierInstancePartitionsMap, RebalanceConfig config) { InstancePartitions offlineInstancePartitions = instancePartitionsMap.get(InstancePartitionsType.OFFLINE); Preconditions .checkState(offlineInstancePartitions != null, "Failed to find OFFLINE instance partitions for table: %s", @@ -78,9 +77,7 @@ public Map> rebalanceTable(Map>>, Map>> pair = rebalanceTiers(currentAssignment, sortedTiers, tierInstancePartitionsMap, bootstrap, diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeSegmentAssignment.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeSegmentAssignment.java index db8b375a4cd..434dbec4e35 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeSegmentAssignment.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeSegmentAssignment.java @@ -25,15 +25,14 @@ import java.util.TreeMap; import javax.annotation.Nullable; import org.apache.commons.collections.CollectionUtils; -import org.apache.commons.configuration.Configuration; import org.apache.commons.lang3.tuple.Pair; import org.apache.pinot.common.assignment.InstancePartitions; import org.apache.pinot.common.tier.Tier; import org.apache.pinot.controller.helix.core.assignment.segment.strategy.SegmentAssignmentStrategy; import org.apache.pinot.controller.helix.core.assignment.segment.strategy.SegmentAssignmentStrategyFactory; +import org.apache.pinot.controller.helix.core.rebalance.RebalanceConfig; import org.apache.pinot.spi.config.table.assignment.InstancePartitionsType; import org.apache.pinot.spi.utils.CommonConstants.Helix.StateModel.SegmentStateModel; -import org.apache.pinot.spi.utils.RebalanceConfigConstants; /** @@ -174,17 +173,14 @@ protected List assignConsumingSegment(int segmentPartitionId, InstancePa @Override public Map> rebalanceTable(Map> currentAssignment, Map instancePartitionsMap, @Nullable List sortedTiers, - @Nullable Map tierInstancePartitionsMap, Configuration config) { + @Nullable Map tierInstancePartitionsMap, RebalanceConfig config) { InstancePartitions completedInstancePartitions = instancePartitionsMap.get(InstancePartitionsType.COMPLETED); InstancePartitions consumingInstancePartitions = instancePartitionsMap.get(InstancePartitionsType.CONSUMING); Preconditions .checkState(consumingInstancePartitions != null, "Failed to find CONSUMING instance partitions for table: %s", _tableNameWithType); - boolean includeConsuming = config - .getBoolean(RebalanceConfigConstants.INCLUDE_CONSUMING, RebalanceConfigConstants.DEFAULT_INCLUDE_CONSUMING); - boolean bootstrap = - config.getBoolean(RebalanceConfigConstants.BOOTSTRAP, RebalanceConfigConstants.DEFAULT_BOOTSTRAP); - + boolean includeConsuming = config.isIncludeConsuming(); + boolean bootstrap = config.isBootstrap(); // Rebalance tiers first Pair>>, Map>> pair = rebalanceTiers(currentAssignment, sortedTiers, tierInstancePartitionsMap, bootstrap, diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/SegmentAssignment.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/SegmentAssignment.java index c2f8f5d61b5..12ec0b55454 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/SegmentAssignment.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/SegmentAssignment.java @@ -21,11 +21,11 @@ import java.util.List; import java.util.Map; import javax.annotation.Nullable; -import org.apache.commons.configuration.Configuration; import org.apache.helix.HelixManager; import org.apache.pinot.common.assignment.InstancePartitions; import org.apache.pinot.common.metrics.ControllerMetrics; import org.apache.pinot.common.tier.Tier; +import org.apache.pinot.controller.helix.core.rebalance.RebalanceConfig; import org.apache.pinot.spi.config.table.TableConfig; import org.apache.pinot.spi.config.table.assignment.InstancePartitionsType; @@ -65,5 +65,5 @@ List assignSegment(String segmentName, Map> */ Map> rebalanceTable(Map> currentAssignment, Map instancePartitionsMap, @Nullable List sortedTiers, - @Nullable Map tierInstancePartitionsMap, Configuration config); + @Nullable Map tierInstancePartitionsMap, RebalanceConfig config); } diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceConfig.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceConfig.java new file mode 100644 index 00000000000..b2a307f2e5e --- /dev/null +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceConfig.java @@ -0,0 +1,193 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.controller.helix.core.rebalance; + +import com.fasterxml.jackson.annotation.JsonProperty; +import io.swagger.annotations.ApiModel; +import io.swagger.annotations.ApiModelProperty; + + +@ApiModel +public class RebalanceConfig { + public static final int DEFAULT_MIN_REPLICAS_TO_KEEP_UP_FOR_NO_DOWNTIME = 1; + public static final long DEFAULT_EXTERNAL_VIEW_CHECK_INTERVAL_IN_MS = 1000L; // 1 second + public static final long DEFAULT_EXTERNAL_VIEW_STABILIZATION_TIMEOUT_IN_MS = 3600000L; // 1 hour + + // Whether to rebalance table in dry-run mode + @JsonProperty("dryRun") + @ApiModelProperty(example = "false") + private boolean _dryRun = false; + + // Whether to reassign instances before reassigning segments + @JsonProperty("reassignInstances") + @ApiModelProperty(example = "false") + private boolean _reassignInstances = false; + + // Whether to reassign CONSUMING segments + @JsonProperty("includeConsuming") + @ApiModelProperty(example = "false") + private boolean _includeConsuming = false; + + // Whether to rebalance table in bootstrap mode (regardless of minimum segment movement, reassign all segments in a + // round-robin fashion as if adding new segments to an empty table) + @JsonProperty("bootstrap") + @ApiModelProperty(example = "false") + private boolean _bootstrap = false; + + // Whether to allow downtime for the rebalance + @JsonProperty("downtime") + @ApiModelProperty(example = "false") + private boolean _downtime = false; + + // For no-downtime rebalance, minimum number of replicas to keep alive during rebalance, or maximum number of replicas + // allowed to be unavailable if value is negative + @JsonProperty("minAvailableReplicas") + @ApiModelProperty(example = "1") + private int _minAvailableReplicas = DEFAULT_MIN_REPLICAS_TO_KEEP_UP_FOR_NO_DOWNTIME; + + // Whether to use best-efforts to rebalance (not fail the rebalance when the no-downtime contract cannot be achieved) + // When using best-efforts to rebalance, the following scenarios won't fail the rebalance (will log warnings instead): + // - Segment falls into ERROR state in ExternalView -> count ERROR state as good state + // - ExternalView has not converged within the maximum wait time -> continue to the next stage + @JsonProperty("bestEfforts") + @ApiModelProperty(example = "false") + private boolean _bestEfforts = false; + + // The check on external view can be very costly when the table has very large ideal and external states, i.e. when + // having a huge number of segments. These two configs help reduce the cpu load on controllers, e.g. by doing the + // check less frequently and bail out sooner to rebalance at best effort if configured so. + @JsonProperty("externalViewCheckIntervalInMs") + @ApiModelProperty(example = "1000") + private long _externalViewCheckIntervalInMs = DEFAULT_EXTERNAL_VIEW_CHECK_INTERVAL_IN_MS; + + @JsonProperty("externalViewStabilizationTimeoutInMs") + @ApiModelProperty(example = "3600000") + private long _externalViewStabilizationTimeoutInMs = DEFAULT_EXTERNAL_VIEW_STABILIZATION_TIMEOUT_IN_MS; + + @JsonProperty("updateTargetTier") + @ApiModelProperty(example = "false") + private boolean _updateTargetTier = false; + + @JsonProperty("jobId") + private String _jobId = null; + + public boolean isDryRun() { + return _dryRun; + } + + public void setDryRun(boolean dryRun) { + _dryRun = dryRun; + } + + public boolean isReassignInstances() { + return _reassignInstances; + } + + public void setReassignInstances(boolean reassignInstances) { + _reassignInstances = reassignInstances; + } + + public boolean isIncludeConsuming() { + return _includeConsuming; + } + + public void setIncludeConsuming(boolean includeConsuming) { + _includeConsuming = includeConsuming; + } + + public boolean isBootstrap() { + return _bootstrap; + } + + public void setBootstrap(boolean bootstrap) { + _bootstrap = bootstrap; + } + + public boolean isDowntime() { + return _downtime; + } + + public void setDowntime(boolean downtime) { + _downtime = downtime; + } + + public int getMinAvailableReplicas() { + return _minAvailableReplicas; + } + + public void setMinAvailableReplicas(int minAvailableReplicas) { + _minAvailableReplicas = minAvailableReplicas; + } + + public boolean isBestEfforts() { + return _bestEfforts; + } + + public void setBestEfforts(boolean bestEfforts) { + _bestEfforts = bestEfforts; + } + + public long getExternalViewCheckIntervalInMs() { + return _externalViewCheckIntervalInMs; + } + + public void setExternalViewCheckIntervalInMs(long externalViewCheckIntervalInMs) { + _externalViewCheckIntervalInMs = externalViewCheckIntervalInMs; + } + + public long getExternalViewStabilizationTimeoutInMs() { + return _externalViewStabilizationTimeoutInMs; + } + + public void setExternalViewStabilizationTimeoutInMs(long externalViewStabilizationTimeoutInMs) { + _externalViewStabilizationTimeoutInMs = externalViewStabilizationTimeoutInMs; + } + + public boolean isUpdateTargetTier() { + return _updateTargetTier; + } + + public void setUpdateTargetTier(boolean updateTargetTier) { + _updateTargetTier = updateTargetTier; + } + + public String getJobId() { + return _jobId; + } + + public void setJobId(String jobId) { + _jobId = jobId; + } + + public static RebalanceConfig copy(RebalanceConfig cfg) { + RebalanceConfig rc = new RebalanceConfig(); + rc._dryRun = cfg._dryRun; + rc._reassignInstances = cfg._reassignInstances; + rc._includeConsuming = cfg._includeConsuming; + rc._bootstrap = cfg._bootstrap; + rc._downtime = cfg._downtime; + rc._minAvailableReplicas = cfg._minAvailableReplicas; + rc._bestEfforts = cfg._bestEfforts; + rc._externalViewCheckIntervalInMs = cfg._externalViewCheckIntervalInMs; + rc._externalViewStabilizationTimeoutInMs = cfg._externalViewStabilizationTimeoutInMs; + rc._updateTargetTier = cfg._updateTargetTier; + rc._jobId = cfg._jobId; + return rc; + } +} diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceContext.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceContext.java deleted file mode 100644 index cd6e06c399a..00000000000 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceContext.java +++ /dev/null @@ -1,138 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pinot.controller.helix.core.rebalance; - -import com.fasterxml.jackson.annotation.JsonProperty; -import io.swagger.annotations.ApiModel; -import io.swagger.annotations.ApiModelProperty; - -@ApiModel -public class RebalanceContext { - // TODO : simplify the rebalance configs wherever possible - @JsonProperty("dryRun") - @ApiModelProperty(example = "false") - private Boolean _dryRun = false; - @JsonProperty("reassignInstances") - @ApiModelProperty(example = "false") - private Boolean _reassignInstances = false; - @JsonProperty("includeConsuming") - @ApiModelProperty(example = "false") - private Boolean _includeConsuming = false; - @JsonProperty("bootstrap") - @ApiModelProperty(example = "false") - private Boolean _bootstrap = false; - @JsonProperty("downtime") - @ApiModelProperty(example = "false") - private Boolean _downtime = false; - @JsonProperty("minAvailableReplicas") - @ApiModelProperty(example = "1") - private Integer _minAvailableReplicas = 1; - @JsonProperty("bestEfforts") - @ApiModelProperty(example = "false") - private Boolean _bestEfforts = false; - @JsonProperty("externalViewCheckIntervalInMs") - @ApiModelProperty(example = "1000") - private Long _externalViewCheckIntervalInMs = 1000L; - @JsonProperty("externalViewStabilizationTimeoutInMs") - @ApiModelProperty(example = "3600000") - private Long _externalViewStabilizationTimeoutInMs = 3600000L; - @JsonProperty("updateTargetTier") - @ApiModelProperty(example = "false") - private Boolean _updateTargetTier = false; - - public Boolean isDryRun() { - return _dryRun; - } - - public void setDryRun(Boolean dryRun) { - _dryRun = dryRun; - } - - public Boolean isReassignInstances() { - return _reassignInstances; - } - - public void setReassignInstances(Boolean reassignInstances) { - _reassignInstances = reassignInstances; - } - - public Boolean isIncludeConsuming() { - return _includeConsuming; - } - - public void setIncludeConsuming(Boolean includeConsuming) { - _includeConsuming = includeConsuming; - } - - public Boolean isBootstrap() { - return _bootstrap; - } - - public void setBootstrap(Boolean bootstrap) { - _bootstrap = bootstrap; - } - - public Boolean isDowntime() { - return _downtime; - } - - public void setDowntime(Boolean downtime) { - _downtime = downtime; - } - - public Integer getMinAvailableReplicas() { - return _minAvailableReplicas; - } - - public void setMinAvailableReplicas(Integer minAvailableReplicas) { - _minAvailableReplicas = minAvailableReplicas; - } - - public Boolean isBestEfforts() { - return _bestEfforts; - } - - public void setBestEfforts(Boolean bestEfforts) { - _bestEfforts = bestEfforts; - } - - public Long getExternalViewCheckIntervalInMs() { - return _externalViewCheckIntervalInMs; - } - - public void setExternalViewCheckIntervalInMs(Long externalViewCheckIntervalInMs) { - _externalViewCheckIntervalInMs = externalViewCheckIntervalInMs; - } - - public Long getExternalViewStabilizationTimeoutInMs() { - return _externalViewStabilizationTimeoutInMs; - } - - public void setExternalViewStabilizationTimeoutInMs(Long externalViewStabilizationTimeoutInMs) { - _externalViewStabilizationTimeoutInMs = externalViewStabilizationTimeoutInMs; - } - - public Boolean isUpdateTargetTier() { - return _updateTargetTier; - } - - public void setUpdateTargetTier(Boolean updateTargetTier) { - _updateTargetTier = updateTargetTier; - } -} diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceJobConstants.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceJobConstants.java new file mode 100644 index 00000000000..8070d449720 --- /dev/null +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceJobConstants.java @@ -0,0 +1,27 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.controller.helix.core.rebalance; + +public class RebalanceJobConstants { + private RebalanceJobConstants() { + } + + // Progress status of the rebalance operartion + public static final String JOB_METADATA_KEY_REBALANCE_PROGRESS_STATS = "REBALANCE_PROGRESS_STATS"; +} diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java index 1a3b7e12f68..74e0f2f5a67 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java @@ -33,7 +33,6 @@ import java.util.function.ToIntFunction; import javax.annotation.Nullable; import org.apache.commons.collections.CollectionUtils; -import org.apache.commons.configuration.Configuration; import org.apache.commons.lang3.tuple.Pair; import org.apache.commons.lang3.tuple.Triple; import org.apache.helix.AccessOption; @@ -64,7 +63,6 @@ import org.apache.pinot.spi.config.table.assignment.InstanceAssignmentConfig; import org.apache.pinot.spi.config.table.assignment.InstancePartitionsType; import org.apache.pinot.spi.utils.CommonConstants.Helix.StateModel.SegmentStateModel; -import org.apache.pinot.spi.utils.RebalanceConfigConstants; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -138,40 +136,27 @@ public static String createUniqueRebalanceJobIdentifier() { return UUID.randomUUID().toString(); } - public RebalanceResult rebalance(TableConfig tableConfig, Configuration rebalanceConfig) { + public RebalanceResult rebalance(TableConfig tableConfig, RebalanceConfig rebalanceConfig) { long startTimeMs = System.currentTimeMillis(); String tableNameWithType = tableConfig.getTableName(); - String rebalanceJobId = rebalanceConfig.getString(RebalanceConfigConstants.JOB_ID); + String rebalanceJobId = rebalanceConfig.getJobId(); if (rebalanceJobId == null) { // If not passed along, create one. // TODO - Add rebalanceJobId to all log messages for easy tracking. rebalanceJobId = createUniqueRebalanceJobIdentifier(); } - - boolean dryRun = - rebalanceConfig.getBoolean(RebalanceConfigConstants.DRY_RUN, RebalanceConfigConstants.DEFAULT_DRY_RUN); - boolean reassignInstances = rebalanceConfig.getBoolean(RebalanceConfigConstants.REASSIGN_INSTANCES, - RebalanceConfigConstants.DEFAULT_REASSIGN_INSTANCES); - boolean includeConsuming = rebalanceConfig.getBoolean(RebalanceConfigConstants.INCLUDE_CONSUMING, - RebalanceConfigConstants.DEFAULT_INCLUDE_CONSUMING); - boolean bootstrap = - rebalanceConfig.getBoolean(RebalanceConfigConstants.BOOTSTRAP, RebalanceConfigConstants.DEFAULT_BOOTSTRAP); - boolean downtime = - rebalanceConfig.getBoolean(RebalanceConfigConstants.DOWNTIME, RebalanceConfigConstants.DEFAULT_DOWNTIME); - int minReplicasToKeepUpForNoDowntime = - rebalanceConfig.getInt(RebalanceConfigConstants.MIN_REPLICAS_TO_KEEP_UP_FOR_NO_DOWNTIME, - RebalanceConfigConstants.DEFAULT_MIN_REPLICAS_TO_KEEP_UP_FOR_NO_DOWNTIME); + boolean dryRun = rebalanceConfig.isDryRun(); + boolean reassignInstances = rebalanceConfig.isReassignInstances(); + boolean includeConsuming = rebalanceConfig.isIncludeConsuming(); + boolean bootstrap = rebalanceConfig.isBootstrap(); + boolean downtime = rebalanceConfig.isDowntime(); + int minReplicasToKeepUpForNoDowntime = rebalanceConfig.getMinAvailableReplicas(); + boolean bestEfforts = rebalanceConfig.isBestEfforts(); + long externalViewCheckIntervalInMs = rebalanceConfig.getExternalViewCheckIntervalInMs(); + long externalViewStabilizationTimeoutInMs = rebalanceConfig.getExternalViewStabilizationTimeoutInMs(); boolean enableStrictReplicaGroup = tableConfig.getRoutingConfig() != null && RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE.equalsIgnoreCase( tableConfig.getRoutingConfig().getInstanceSelectorType()); - boolean bestEfforts = rebalanceConfig.getBoolean(RebalanceConfigConstants.BEST_EFFORTS, - RebalanceConfigConstants.DEFAULT_BEST_EFFORTS); - long externalViewCheckIntervalInMs = - rebalanceConfig.getLong(RebalanceConfigConstants.EXTERNAL_VIEW_CHECK_INTERVAL_IN_MS, - RebalanceConfigConstants.DEFAULT_EXTERNAL_VIEW_CHECK_INTERVAL_IN_MS); - long externalViewStabilizationTimeoutInMs = - rebalanceConfig.getLong(RebalanceConfigConstants.EXTERNAL_VIEW_STABILIZATION_TIMEOUT_IN_MS, - RebalanceConfigConstants.DEFAULT_EXTERNAL_VIEW_STABILIZATION_TIMEOUT_IN_MS); LOGGER.info( "Start rebalancing table: {} with dryRun: {}, reassignInstances: {}, includeConsuming: {}, bootstrap: {}, " + "downtime: {}, minReplicasToKeepUpForNoDowntime: {}, enableStrictReplicaGroup: {}, bestEfforts: {}, " diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/ZkBasedTableRebalanceObserver.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/ZkBasedTableRebalanceObserver.java index 5db4bebf588..1978df35f39 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/ZkBasedTableRebalanceObserver.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/ZkBasedTableRebalanceObserver.java @@ -27,7 +27,6 @@ import org.apache.pinot.controller.helix.core.PinotHelixResourceManager; import org.apache.pinot.spi.utils.CommonConstants; import org.apache.pinot.spi.utils.JsonUtils; -import org.apache.pinot.spi.utils.RebalanceConfigConstants; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -134,7 +133,7 @@ private void trackStatsInZk() { jobMetadata.put(CommonConstants.ControllerJob.SUBMISSION_TIME_MS, Long.toString(System.currentTimeMillis())); jobMetadata.put(CommonConstants.ControllerJob.JOB_TYPE, ControllerJobType.TABLE_REBALANCE.name()); try { - jobMetadata.put(RebalanceConfigConstants.REBALANCE_PROGRESS_STATS, + jobMetadata.put(RebalanceJobConstants.JOB_METADATA_KEY_REBALANCE_PROGRESS_STATS, JsonUtils.objectToString(_tableRebalanceProgressStats)); } catch (JsonProcessingException e) { LOGGER.error("Error serialising rebalance stats to JSON for persisting to ZK {}", _rebalanceJobId, e); diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/DefaultTenantRebalancer.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/DefaultTenantRebalancer.java index d0278c15e39..04d18230ae5 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/DefaultTenantRebalancer.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/DefaultTenantRebalancer.java @@ -29,13 +29,11 @@ import java.util.concurrent.ConcurrentLinkedDeque; import java.util.concurrent.ExecutorService; import java.util.concurrent.atomic.AtomicInteger; -import org.apache.commons.configuration.BaseConfiguration; -import org.apache.commons.configuration.Configuration; import org.apache.pinot.common.exception.TableNotFoundException; import org.apache.pinot.controller.helix.core.PinotHelixResourceManager; +import org.apache.pinot.controller.helix.core.rebalance.RebalanceConfig; import org.apache.pinot.controller.helix.core.rebalance.RebalanceResult; import org.apache.pinot.spi.config.table.TableConfig; -import org.apache.pinot.spi.utils.RebalanceConfigConstants; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -50,21 +48,22 @@ public DefaultTenantRebalancer(PinotHelixResourceManager pinotHelixResourceManag } @Override - public TenantRebalanceResult rebalance(TenantRebalanceContext context) { + public TenantRebalanceResult rebalance(TenantRebalanceConfig config) { Map rebalanceResult = new HashMap<>(); - Set tables = getTenantTables(context.getTenantName()); + Set tables = getTenantTables(config.getTenantName()); tables.forEach(table -> { try { - Configuration config = extractRebalanceConfig(context); - config.setProperty(RebalanceConfigConstants.DRY_RUN, true); - rebalanceResult.put(table, _pinotHelixResourceManager.rebalanceTable(table, config, false)); + RebalanceConfig rebalanceConfig = RebalanceConfig.copy(config); + rebalanceConfig.setJobId(createUniqueRebalanceJobIdentifier()); + rebalanceConfig.setDryRun(true); + rebalanceResult.put(table, _pinotHelixResourceManager.rebalanceTable(table, rebalanceConfig, false)); } catch (TableNotFoundException exception) { rebalanceResult.put(table, new RebalanceResult(null, RebalanceResult.Status.FAILED, exception.getMessage(), null, null, null)); } }); - if (context.isDryRun() || context.isDowntime()) { - return new TenantRebalanceResult(null, rebalanceResult, context.isVerboseResult()); + if (config.isDryRun() || config.isDowntime()) { + return new TenantRebalanceResult(null, rebalanceResult, config.isVerboseResult()); } else { for (String table : rebalanceResult.keySet()) { RebalanceResult result = rebalanceResult.get(table); @@ -77,25 +76,25 @@ public TenantRebalanceResult rebalance(TenantRebalanceContext context) { } String tenantRebalanceJobId = createUniqueRebalanceJobIdentifier(); - TenantRebalanceObserver observer = new ZkBasedTenantRebalanceObserver(tenantRebalanceJobId, context.getTenantName(), + TenantRebalanceObserver observer = new ZkBasedTenantRebalanceObserver(tenantRebalanceJobId, config.getTenantName(), tables, _pinotHelixResourceManager); observer.onTrigger(TenantRebalanceObserver.Trigger.START_TRIGGER, null, null); final Deque sequentialQueue = new LinkedList<>(); final Deque parallelQueue = new ConcurrentLinkedDeque<>(); // ensure atleast 1 thread is created to run the sequential table rebalance operations - int parallelism = Math.max(context.getDegreeOfParallelism(), 1); - Set dimTables = getDimensionalTables(context.getTenantName()); + int parallelism = Math.max(config.getDegreeOfParallelism(), 1); + Set dimTables = getDimensionalTables(config.getTenantName()); AtomicInteger activeThreads = new AtomicInteger(parallelism); try { if (parallelism > 1) { Set parallelTables; - if (!context.getParallelWhitelist().isEmpty()) { - parallelTables = new HashSet<>(context.getParallelWhitelist()); + if (!config.getParallelWhitelist().isEmpty()) { + parallelTables = new HashSet<>(config.getParallelWhitelist()); } else { parallelTables = new HashSet<>(tables); } - if (!context.getParallelBlacklist().isEmpty()) { - parallelTables = Sets.difference(parallelTables, context.getParallelBlacklist()); + if (!config.getParallelBlacklist().isEmpty()) { + parallelTables = Sets.difference(parallelTables, config.getParallelBlacklist()); } parallelTables.forEach(table -> { if (dimTables.contains(table)) { @@ -131,33 +130,33 @@ public TenantRebalanceResult rebalance(TenantRebalanceContext context) { if (table == null) { break; } - Configuration config = extractRebalanceConfig(context); - config.setProperty(RebalanceConfigConstants.DRY_RUN, false); - config.setProperty(RebalanceConfigConstants.JOB_ID, rebalanceResult.get(table).getJobId()); - rebalanceTable(table, config, observer); + RebalanceConfig rebalanceConfig = RebalanceConfig.copy(config); + rebalanceConfig.setDryRun(false); + rebalanceConfig.setJobId(rebalanceResult.get(table).getJobId()); + rebalanceTable(table, rebalanceConfig, observer); } // Last parallel thread to finish the table rebalance job will pick up the // sequential table rebalance execution if (activeThreads.decrementAndGet() == 0) { - Configuration config = extractRebalanceConfig(context); - config.setProperty(RebalanceConfigConstants.DRY_RUN, false); + RebalanceConfig rebalanceConfig = RebalanceConfig.copy(config); + rebalanceConfig.setDryRun(false); while (true) { String table = sequentialQueue.pollFirst(); if (table == null) { break; } - config.setProperty(RebalanceConfigConstants.JOB_ID, rebalanceResult.get(table).getJobId()); - rebalanceTable(table, config, observer); + rebalanceConfig.setJobId(rebalanceResult.get(table).getJobId()); + rebalanceTable(table, rebalanceConfig, observer); } - observer.onSuccess(String.format("Successfully rebalanced tenant %s.", context.getTenantName())); + observer.onSuccess(String.format("Successfully rebalanced tenant %s.", config.getTenantName())); } }); } } catch (Exception exception) { - observer.onError(String.format("Failed to rebalance the tenant %s. Cause: %s", context.getTenantName(), + observer.onError(String.format("Failed to rebalance the tenant %s. Cause: %s", config.getTenantName(), exception.getMessage())); } - return new TenantRebalanceResult(tenantRebalanceJobId, rebalanceResult, context.isVerboseResult()); + return new TenantRebalanceResult(tenantRebalanceJobId, rebalanceResult, config.isVerboseResult()); } private Set getDimensionalTables(String tenantName) { @@ -175,25 +174,6 @@ private Set getDimensionalTables(String tenantName) { return dimTables; } - private Configuration extractRebalanceConfig(TenantRebalanceContext context) { - Configuration rebalanceConfig = new BaseConfiguration(); - rebalanceConfig.addProperty(RebalanceConfigConstants.DRY_RUN, context.isDryRun()); - rebalanceConfig.addProperty(RebalanceConfigConstants.REASSIGN_INSTANCES, context.isReassignInstances()); - rebalanceConfig.addProperty(RebalanceConfigConstants.INCLUDE_CONSUMING, context.isIncludeConsuming()); - rebalanceConfig.addProperty(RebalanceConfigConstants.BOOTSTRAP, context.isBootstrap()); - rebalanceConfig.addProperty(RebalanceConfigConstants.DOWNTIME, context.isDowntime()); - rebalanceConfig.addProperty(RebalanceConfigConstants.MIN_REPLICAS_TO_KEEP_UP_FOR_NO_DOWNTIME, - context.getMinAvailableReplicas()); - rebalanceConfig.addProperty(RebalanceConfigConstants.BEST_EFFORTS, context.isBestEfforts()); - rebalanceConfig.addProperty(RebalanceConfigConstants.EXTERNAL_VIEW_CHECK_INTERVAL_IN_MS, - context.getExternalViewCheckIntervalInMs()); - rebalanceConfig.addProperty(RebalanceConfigConstants.EXTERNAL_VIEW_STABILIZATION_TIMEOUT_IN_MS, - context.getExternalViewStabilizationTimeoutInMs()); - rebalanceConfig.addProperty(RebalanceConfigConstants.UPDATE_TARGET_TIER, context.isUpdateTargetTier()); - rebalanceConfig.addProperty(RebalanceConfigConstants.JOB_ID, createUniqueRebalanceJobIdentifier()); - return rebalanceConfig; - } - private String createUniqueRebalanceJobIdentifier() { return UUID.randomUUID().toString(); } @@ -214,11 +194,10 @@ private Set getTenantTables(String tenantName) { return tables; } - private void rebalanceTable(String tableName, Configuration config, + private void rebalanceTable(String tableName, RebalanceConfig config, TenantRebalanceObserver observer) { try { - observer.onTrigger(TenantRebalanceObserver.Trigger.REBALANCE_STARTED_TRIGGER, tableName, - config.getString(RebalanceConfigConstants.JOB_ID)); + observer.onTrigger(TenantRebalanceObserver.Trigger.REBALANCE_STARTED_TRIGGER, tableName, config.getJobId()); RebalanceResult result = _pinotHelixResourceManager.rebalanceTable(tableName, config, true); if (result.getStatus().equals(RebalanceResult.Status.DONE)) { observer.onTrigger(TenantRebalanceObserver.Trigger.REBALANCE_COMPLETED_TRIGGER, tableName, null); diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/TenantRebalanceContext.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/TenantRebalanceConfig.java similarity index 95% rename from pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/TenantRebalanceContext.java rename to pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/TenantRebalanceConfig.java index 5e76dcc014c..3ca6bd777eb 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/TenantRebalanceContext.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/TenantRebalanceConfig.java @@ -23,15 +23,15 @@ import io.swagger.annotations.ApiModelProperty; import java.util.HashSet; import java.util.Set; -import org.apache.pinot.controller.helix.core.rebalance.RebalanceContext; +import org.apache.pinot.controller.helix.core.rebalance.RebalanceConfig; -public class TenantRebalanceContext extends RebalanceContext { +public class TenantRebalanceConfig extends RebalanceConfig { @JsonIgnore private String _tenantName; @JsonProperty("degreeOfParallelism") @ApiModelProperty(example = "1") - private Integer _degreeOfParallelism = 1; + private int _degreeOfParallelism = 1; @JsonProperty("parallelWhitelist") private Set _parallelWhitelist = new HashSet<>(); @JsonProperty("parallelBlacklist") diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/TenantRebalancer.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/TenantRebalancer.java index 53df7824d53..4b28b305691 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/TenantRebalancer.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/TenantRebalancer.java @@ -18,7 +18,6 @@ */ package org.apache.pinot.controller.helix.core.rebalance.tenant; - public interface TenantRebalancer { - TenantRebalanceResult rebalance(TenantRebalanceContext context); + TenantRebalanceResult rebalance(TenantRebalanceConfig config); } diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/ZkBasedTenantRebalanceObserver.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/ZkBasedTenantRebalanceObserver.java index 7521caa3f3d..6c269855392 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/ZkBasedTenantRebalanceObserver.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/ZkBasedTenantRebalanceObserver.java @@ -28,9 +28,9 @@ import org.apache.pinot.common.metadata.ZKMetadataProvider; import org.apache.pinot.common.metadata.controllerjob.ControllerJobType; import org.apache.pinot.controller.helix.core.PinotHelixResourceManager; +import org.apache.pinot.controller.helix.core.rebalance.RebalanceJobConstants; import org.apache.pinot.spi.utils.CommonConstants; import org.apache.pinot.spi.utils.JsonUtils; -import org.apache.pinot.spi.utils.RebalanceConfigConstants; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -103,7 +103,7 @@ private void trackStatsInZk() { jobMetadata.put(CommonConstants.ControllerJob.SUBMISSION_TIME_MS, Long.toString(System.currentTimeMillis())); jobMetadata.put(CommonConstants.ControllerJob.JOB_TYPE, ControllerJobType.TENANT_REBALANCE.name()); try { - jobMetadata.put(RebalanceConfigConstants.REBALANCE_PROGRESS_STATS, + jobMetadata.put(RebalanceJobConstants.JOB_METADATA_KEY_REBALANCE_PROGRESS_STATS, JsonUtils.objectToString(_progressStats)); } catch (JsonProcessingException e) { LOGGER.error("Error serialising rebalance stats to JSON for persisting to ZK {}", _jobId, e); diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/relocation/SegmentRelocator.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/relocation/SegmentRelocator.java index ad9cd1f0f51..0a77f9ff1f3 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/relocation/SegmentRelocator.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/relocation/SegmentRelocator.java @@ -30,8 +30,6 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedBlockingQueue; import java.util.function.Consumer; -import org.apache.commons.configuration.BaseConfiguration; -import org.apache.commons.configuration.Configuration; import org.apache.helix.ClusterMessagingService; import org.apache.helix.Criteria; import org.apache.helix.InstanceType; @@ -44,12 +42,12 @@ import org.apache.pinot.controller.LeadControllerManager; import org.apache.pinot.controller.helix.core.PinotHelixResourceManager; import org.apache.pinot.controller.helix.core.periodictask.ControllerPeriodicTask; +import org.apache.pinot.controller.helix.core.rebalance.RebalanceConfig; import org.apache.pinot.controller.helix.core.rebalance.RebalanceResult; import org.apache.pinot.controller.helix.core.rebalance.TableRebalancer; import org.apache.pinot.controller.util.TableTierReader; import org.apache.pinot.spi.config.table.TableConfig; import org.apache.pinot.spi.config.table.TableType; -import org.apache.pinot.spi.utils.RebalanceConfigConstants; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -166,15 +164,12 @@ private void rebalanceTable(String tableNameWithType) { } // Allow at most one replica unavailable during relocation - Configuration rebalanceConfig = new BaseConfiguration(); - rebalanceConfig.addProperty(RebalanceConfigConstants.MIN_REPLICAS_TO_KEEP_UP_FOR_NO_DOWNTIME, -1); - rebalanceConfig.addProperty(RebalanceConfigConstants.EXTERNAL_VIEW_CHECK_INTERVAL_IN_MS, - _externalViewCheckIntervalInMs); - rebalanceConfig.addProperty(RebalanceConfigConstants.EXTERNAL_VIEW_STABILIZATION_TIMEOUT_IN_MS, - _externalViewStabilizationTimeoutInMs); - rebalanceConfig.addProperty(RebalanceConfigConstants.UPDATE_TARGET_TIER, - TierConfigUtils.shouldRelocateToTiers(tableConfig)); - rebalanceConfig.addProperty(RebalanceConfigConstants.JOB_ID, TableRebalancer.createUniqueRebalanceJobIdentifier()); + RebalanceConfig rebalanceConfig = new RebalanceConfig(); + rebalanceConfig.setMinAvailableReplicas(-1); + rebalanceConfig.setExternalViewCheckIntervalInMs(_externalViewCheckIntervalInMs); + rebalanceConfig.setExternalViewStabilizationTimeoutInMs(_externalViewStabilizationTimeoutInMs); + rebalanceConfig.setUpdateTargetTier(TierConfigUtils.shouldRelocateToTiers(tableConfig)); + rebalanceConfig.setJobId(TableRebalancer.createUniqueRebalanceJobIdentifier()); try { // Relocating segments to new tiers needs two sequential actions: table rebalance and local tier migration. diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/OfflineNonReplicaGroupTieredSegmentAssignmentTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/OfflineNonReplicaGroupTieredSegmentAssignmentTest.java index af75b05c23a..c9a42536d3d 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/OfflineNonReplicaGroupTieredSegmentAssignmentTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/OfflineNonReplicaGroupTieredSegmentAssignmentTest.java @@ -25,20 +25,19 @@ import java.util.List; import java.util.Map; import java.util.TreeMap; -import org.apache.commons.configuration.BaseConfiguration; -import org.apache.commons.configuration.Configuration; +import java.util.TreeSet; import org.apache.pinot.common.assignment.InstancePartitions; import org.apache.pinot.common.assignment.InstancePartitionsUtils; import org.apache.pinot.common.tier.PinotServerTierStorage; import org.apache.pinot.common.tier.Tier; import org.apache.pinot.common.tier.TierFactory; import org.apache.pinot.common.tier.TierSegmentSelector; +import org.apache.pinot.controller.helix.core.rebalance.RebalanceConfig; import org.apache.pinot.spi.config.table.TableConfig; import org.apache.pinot.spi.config.table.TableType; import org.apache.pinot.spi.config.table.TierConfig; import org.apache.pinot.spi.config.table.assignment.InstancePartitionsType; import org.apache.pinot.spi.utils.CommonConstants.Helix.StateModel.SegmentStateModel; -import org.apache.pinot.spi.utils.RebalanceConfigConstants; import org.apache.pinot.spi.utils.builder.TableConfigBuilder; import org.testng.Assert; import org.testng.annotations.BeforeClass; @@ -165,7 +164,7 @@ public void testTableBalanced() { // On rebalancing, segments move to tiers Map> newAssignment = _segmentAssignment.rebalanceTable(currentAssignment, _instancePartitionsMap, _sortedTiers, - _tierInstancePartitionsMap, new BaseConfiguration()); + _tierInstancePartitionsMap, new RebalanceConfig()); assertEquals(newAssignment.size(), NUM_SEGMENTS); // segments 0-49 remain unchanged @@ -205,7 +204,7 @@ public void testTableBalanced() { // rebalance without tierInstancePartitions resets the assignment Map> resetAssignment = - _segmentAssignment.rebalanceTable(newAssignment, _instancePartitionsMap, null, null, new BaseConfiguration()); + _segmentAssignment.rebalanceTable(newAssignment, _instancePartitionsMap, null, null, new RebalanceConfig()); for (String segment : SEGMENTS) { Assert.assertTrue(INSTANCES.containsAll(resetAssignment.get(segment).keySet())); } @@ -214,7 +213,10 @@ public void testTableBalanced() { @Test public void testBootstrapTable() { Map> currentAssignment = new TreeMap<>(); - for (String segmentName : SEGMENTS) { + // The list of segments are segment_0, segment_1, ... segment_10, ...; but TreeMap sorts segments as segment_0, + // segment_1, segment_10, ... segment_2, ... So to make bootstrap generate same assignment as assigning each + // segment separately, we need to process the segments in the same order. + for (String segmentName : new TreeSet<>(SEGMENTS)) { List instancesAssigned = _segmentAssignment.assignSegment(segmentName, currentAssignment, _instancePartitionsMap); currentAssignment.put(segmentName, @@ -222,11 +224,11 @@ public void testBootstrapTable() { } // Bootstrap table should reassign all segments - Configuration rebalanceConfig = new BaseConfiguration(); - rebalanceConfig.setProperty(RebalanceConfigConstants.BOOTSTRAP, true); + RebalanceConfig rebalanceConfig = new RebalanceConfig(); + rebalanceConfig.setBootstrap(true); Map> newAssignment = _segmentAssignment.rebalanceTable(currentAssignment, _instancePartitionsMap, _sortedTiers, - _tierInstancePartitionsMap, new BaseConfiguration()); + _tierInstancePartitionsMap, rebalanceConfig); assertEquals(newAssignment.size(), NUM_SEGMENTS); // segments 0-49 remain unchanged diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeNonReplicaGroupSegmentAssignmentTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeNonReplicaGroupSegmentAssignmentTest.java index 4ab378e3a74..9d921d3af84 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeNonReplicaGroupSegmentAssignmentTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeNonReplicaGroupSegmentAssignmentTest.java @@ -24,18 +24,17 @@ import java.util.List; import java.util.Map; import java.util.TreeMap; -import org.apache.commons.configuration.BaseConfiguration; import org.apache.helix.HelixManager; import org.apache.helix.store.zk.ZkHelixPropertyStore; import org.apache.helix.zookeeper.datamodel.ZNRecord; import org.apache.pinot.common.assignment.InstancePartitions; import org.apache.pinot.common.utils.LLCSegmentName; +import org.apache.pinot.controller.helix.core.rebalance.RebalanceConfig; import org.apache.pinot.core.realtime.impl.fakestream.FakeStreamConfigUtils; import org.apache.pinot.spi.config.table.TableConfig; import org.apache.pinot.spi.config.table.TableType; import org.apache.pinot.spi.config.table.assignment.InstancePartitionsType; import org.apache.pinot.spi.utils.CommonConstants.Helix.StateModel.SegmentStateModel; -import org.apache.pinot.spi.utils.RebalanceConfigConstants; import org.apache.pinot.spi.utils.builder.TableConfigBuilder; import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; @@ -227,13 +226,12 @@ public void testRelocateCompletedSegments() { Map noRelocationInstancePartitionsMap = ImmutableMap.of(InstancePartitionsType.CONSUMING, _instancePartitionsMap.get(InstancePartitionsType.CONSUMING)); assertEquals(_segmentAssignment.rebalanceTable(currentAssignment, noRelocationInstancePartitionsMap, null, null, - new BaseConfiguration()), currentAssignment); + new RebalanceConfig()), currentAssignment); // Rebalance with COMPLETED instance partitions should relocate all COMPLETED (ONLINE) segments to the COMPLETED // instances Map> newAssignment = - _segmentAssignment.rebalanceTable(currentAssignment, _instancePartitionsMap, null, null, - new BaseConfiguration()); + _segmentAssignment.rebalanceTable(currentAssignment, _instancePartitionsMap, null, null, new RebalanceConfig()); assertEquals(newAssignment.size(), NUM_SEGMENTS + numUploadedSegments + 1); for (int segmentId = 0; segmentId < NUM_SEGMENTS; segmentId++) { if (segmentId < NUM_SEGMENTS - NUM_PARTITIONS) { @@ -270,8 +268,8 @@ public void testRelocateCompletedSegments() { assertEquals(totalNumSegmentsAssigned, expectedTotalNumSegmentsAssigned); // Rebalance with COMPLETED instance partitions including CONSUMING segments should give the same assignment - BaseConfiguration rebalanceConfig = new BaseConfiguration(); - rebalanceConfig.setProperty(RebalanceConfigConstants.INCLUDE_CONSUMING, true); + RebalanceConfig rebalanceConfig = new RebalanceConfig(); + rebalanceConfig.setIncludeConsuming(true); assertEquals( _segmentAssignment.rebalanceTable(currentAssignment, _instancePartitionsMap, null, null, rebalanceConfig), newAssignment); @@ -279,11 +277,11 @@ public void testRelocateCompletedSegments() { // Rebalance without COMPLETED instance partitions again should change the segment assignment back currentAssignment.put(offlineSegmentName, offlineSegmentInstanceStateMap); assertEquals(_segmentAssignment.rebalanceTable(newAssignment, noRelocationInstancePartitionsMap, null, null, - new BaseConfiguration()), currentAssignment); + new RebalanceConfig()), currentAssignment); // Bootstrap table without COMPLETED instance partitions should be the same as regular rebalance - rebalanceConfig = new BaseConfiguration(); - rebalanceConfig.setProperty(RebalanceConfigConstants.BOOTSTRAP, true); + rebalanceConfig = new RebalanceConfig(); + rebalanceConfig.setBootstrap(true); assertEquals(_segmentAssignment.rebalanceTable(currentAssignment, noRelocationInstancePartitionsMap, null, null, rebalanceConfig), currentAssignment); assertEquals(_segmentAssignment.rebalanceTable(newAssignment, noRelocationInstancePartitionsMap, null, null, diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeNonReplicaGroupTieredSegmentAssignmentTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeNonReplicaGroupTieredSegmentAssignmentTest.java index f540350e647..e03a8d4a46a 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeNonReplicaGroupTieredSegmentAssignmentTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeNonReplicaGroupTieredSegmentAssignmentTest.java @@ -25,7 +25,6 @@ import java.util.List; import java.util.Map; import java.util.TreeMap; -import org.apache.commons.configuration.BaseConfiguration; import org.apache.pinot.common.assignment.InstancePartitions; import org.apache.pinot.common.assignment.InstancePartitionsUtils; import org.apache.pinot.common.tier.PinotServerTierStorage; @@ -33,13 +32,13 @@ import org.apache.pinot.common.tier.TierFactory; import org.apache.pinot.common.tier.TierSegmentSelector; import org.apache.pinot.common.utils.LLCSegmentName; +import org.apache.pinot.controller.helix.core.rebalance.RebalanceConfig; import org.apache.pinot.core.realtime.impl.fakestream.FakeStreamConfigUtils; import org.apache.pinot.spi.config.table.TableConfig; import org.apache.pinot.spi.config.table.TableType; import org.apache.pinot.spi.config.table.TierConfig; import org.apache.pinot.spi.config.table.assignment.InstancePartitionsType; import org.apache.pinot.spi.utils.CommonConstants.Helix.StateModel.SegmentStateModel; -import org.apache.pinot.spi.utils.RebalanceConfigConstants; import org.apache.pinot.spi.utils.builder.TableConfigBuilder; import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; @@ -184,8 +183,7 @@ public void testRelocateCompletedSegments() { // Rebalance without tier instancePartitions moves all instances to COMPLETED Map> newAssignment = - _segmentAssignment.rebalanceTable(currentAssignment, _instancePartitionsMap, null, null, - new BaseConfiguration()); + _segmentAssignment.rebalanceTable(currentAssignment, _instancePartitionsMap, null, null, new RebalanceConfig()); assertEquals(newAssignment.size(), NUM_SEGMENTS); for (int segmentId = 0; segmentId < NUM_SEGMENTS; segmentId++) { if (segmentId < NUM_SEGMENTS - NUM_PARTITIONS) { // ONLINE segments @@ -208,7 +206,7 @@ public void testRelocateCompletedSegments() { int expectedOnTierB = 20; int expectedOnCompleted = NUM_SEGMENTS - NUM_PARTITIONS - expectedOnTierA - expectedOnTierB; newAssignment = _segmentAssignment.rebalanceTable(currentAssignment, _instancePartitionsMap, _sortedTiers, - _tierInstancePartitionsMap, new BaseConfiguration()); + _tierInstancePartitionsMap, new RebalanceConfig()); assertEquals(newAssignment.size(), NUM_SEGMENTS); for (int segmentId = 0; segmentId < NUM_SEGMENTS; segmentId++) { if (segmentId < NUM_SEGMENTS - NUM_PARTITIONS) { @@ -259,8 +257,8 @@ public void testRelocateCompletedSegments() { } // Rebalance with including CONSUMING should give the same assignment - BaseConfiguration rebalanceConfig = new BaseConfiguration(); - rebalanceConfig.setProperty(RebalanceConfigConstants.INCLUDE_CONSUMING, true); + RebalanceConfig rebalanceConfig = new RebalanceConfig(); + rebalanceConfig.setIncludeConsuming(true); assertEquals(_segmentAssignment.rebalanceTable(currentAssignment, _instancePartitionsMap, _sortedTiers, _tierInstancePartitionsMap, rebalanceConfig), newAssignment); @@ -270,13 +268,13 @@ public void testRelocateCompletedSegments() { noRelocationInstancePartitionsMap.put(InstancePartitionsType.CONSUMING, _instancePartitionsMap.get(InstancePartitionsType.CONSUMING)); assertEquals(_segmentAssignment.rebalanceTable(newAssignment, noRelocationInstancePartitionsMap, null, null, - new BaseConfiguration()), currentAssignment); + new RebalanceConfig()), currentAssignment); // Rebalance without COMPLETED instance partitions and with tierInstancePartitions should move ONLINE segments to // Tiers and CONSUMING segments to CONSUMING tenant. newAssignment = _segmentAssignment.rebalanceTable(currentAssignment, noRelocationInstancePartitionsMap, _sortedTiers, - _tierInstancePartitionsMap, new BaseConfiguration()); + _tierInstancePartitionsMap, new RebalanceConfig()); numSegmentsAssignedPerInstance = SegmentAssignmentUtils.getNumSegmentsAssignedPerInstance(newAssignment, INSTANCES_TIER_A); @@ -299,8 +297,8 @@ public void testRelocateCompletedSegments() { assertEquals(numSegmentsAssignedPerInstance.length, NUM_CONSUMING_INSTANCES); // Bootstrap - rebalanceConfig = new BaseConfiguration(); - rebalanceConfig.setProperty(RebalanceConfigConstants.BOOTSTRAP, true); + rebalanceConfig = new RebalanceConfig(); + rebalanceConfig.setBootstrap(true); newAssignment = _segmentAssignment.rebalanceTable(currentAssignment, _instancePartitionsMap, _sortedTiers, _tierInstancePartitionsMap, rebalanceConfig); int index = 0; diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeReplicaGroupSegmentAssignmentTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeReplicaGroupSegmentAssignmentTest.java index a6da0552cd8..2c590ef25e3 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeReplicaGroupSegmentAssignmentTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeReplicaGroupSegmentAssignmentTest.java @@ -25,12 +25,12 @@ import java.util.List; import java.util.Map; import java.util.TreeMap; -import org.apache.commons.configuration.BaseConfiguration; import org.apache.helix.HelixManager; import org.apache.helix.store.zk.ZkHelixPropertyStore; import org.apache.helix.zookeeper.datamodel.ZNRecord; import org.apache.pinot.common.assignment.InstancePartitions; import org.apache.pinot.common.utils.LLCSegmentName; +import org.apache.pinot.controller.helix.core.rebalance.RebalanceConfig; import org.apache.pinot.core.realtime.impl.fakestream.FakeStreamConfigUtils; import org.apache.pinot.spi.config.table.ReplicaGroupStrategyConfig; import org.apache.pinot.spi.config.table.TableConfig; @@ -38,7 +38,6 @@ import org.apache.pinot.spi.config.table.assignment.InstancePartitionsType; import org.apache.pinot.spi.utils.CommonConstants.Helix.StateModel.SegmentStateModel; import org.apache.pinot.spi.utils.CommonConstants.Segment.AssignmentStrategy; -import org.apache.pinot.spi.utils.RebalanceConfigConstants; import org.apache.pinot.spi.utils.builder.TableConfigBuilder; import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; @@ -214,13 +213,12 @@ public void testRelocateCompletedSegments() { Map noRelocationInstancePartitionsMap = ImmutableMap.of(InstancePartitionsType.CONSUMING, _instancePartitionsMap.get(InstancePartitionsType.CONSUMING)); assertEquals(_segmentAssignment.rebalanceTable(currentAssignment, noRelocationInstancePartitionsMap, null, null, - new BaseConfiguration()), currentAssignment); + new RebalanceConfig()), currentAssignment); // Rebalance with COMPLETED instance partitions should relocate all COMPLETED (ONLINE) segments to the COMPLETED // instances Map> newAssignment = - _segmentAssignment.rebalanceTable(currentAssignment, _instancePartitionsMap, null, null, - new BaseConfiguration()); + _segmentAssignment.rebalanceTable(currentAssignment, _instancePartitionsMap, null, null, new RebalanceConfig()); assertEquals(newAssignment.size(), NUM_SEGMENTS + uploadedSegmentNames.size() + 1); for (int segmentId = 0; segmentId < NUM_SEGMENTS; segmentId++) { if (segmentId < NUM_SEGMENTS - NUM_PARTITIONS) { @@ -255,19 +253,19 @@ public void testRelocateCompletedSegments() { } // Rebalance with COMPLETED instance partitions including CONSUMING segments should give the same assignment - BaseConfiguration rebalanceConfig = new BaseConfiguration(); - rebalanceConfig.setProperty(RebalanceConfigConstants.INCLUDE_CONSUMING, true); + RebalanceConfig rebalanceConfig = new RebalanceConfig(); + rebalanceConfig.setIncludeConsuming(true); assertEquals( _segmentAssignment.rebalanceTable(currentAssignment, _instancePartitionsMap, null, null, rebalanceConfig), newAssignment); // Rebalance without COMPLETED instance partitions again should change the segment assignment back assertEquals(_segmentAssignment.rebalanceTable(newAssignment, noRelocationInstancePartitionsMap, null, null, - new BaseConfiguration()), currentAssignment); + new RebalanceConfig()), currentAssignment); // Bootstrap table without COMPLETED instance partitions should be the same as regular rebalance - rebalanceConfig = new BaseConfiguration(); - rebalanceConfig.setProperty(RebalanceConfigConstants.BOOTSTRAP, true); + rebalanceConfig = new RebalanceConfig(); + rebalanceConfig.setBootstrap(true); assertEquals(_segmentAssignment.rebalanceTable(currentAssignment, noRelocationInstancePartitionsMap, null, null, rebalanceConfig), currentAssignment); assertEquals(_segmentAssignment.rebalanceTable(newAssignment, noRelocationInstancePartitionsMap, null, null, @@ -417,8 +415,8 @@ public void testExplicitPartition() { Map instancePartitionsMap = ImmutableMap.of(InstancePartitionsType.CONSUMING, consumingInstancePartitions, InstancePartitionsType.COMPLETED, completedInstancePartitions); - BaseConfiguration rebalanceConfig = new BaseConfiguration(); - rebalanceConfig.setProperty(RebalanceConfigConstants.INCLUDE_CONSUMING, true); + RebalanceConfig rebalanceConfig = new RebalanceConfig(); + rebalanceConfig.setIncludeConsuming(true); Map> newAssignment = _segmentAssignment.rebalanceTable(uploadedCurrentAssignment, instancePartitionsMap, null, null, rebalanceConfig); diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/strategy/AllServersSegmentAssignmentStrategyTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/strategy/AllServersSegmentAssignmentStrategyTest.java index 7a75f8350da..f8df6b53eb3 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/strategy/AllServersSegmentAssignmentStrategyTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/strategy/AllServersSegmentAssignmentStrategyTest.java @@ -35,6 +35,7 @@ import org.apache.pinot.controller.helix.core.assignment.segment.SegmentAssignment; import org.apache.pinot.controller.helix.core.assignment.segment.SegmentAssignmentFactory; import org.apache.pinot.controller.helix.core.assignment.segment.SegmentAssignmentTestUtils; +import org.apache.pinot.controller.helix.core.rebalance.RebalanceConfig; import org.apache.pinot.spi.config.table.TableConfig; import org.apache.pinot.spi.config.table.TableType; import org.apache.pinot.spi.config.table.assignment.InstancePartitionsType; @@ -124,7 +125,7 @@ public void testSegmentAssignmentAndRebalance() { when(dataAccessor.getChildValues(builder.instanceConfigs(), true)).thenReturn(instanceConfigList); Map> newAssignment = - _segmentAssignment.rebalanceTable(currentAssignment, _instancePartitionsMap, null, null, null); + _segmentAssignment.rebalanceTable(currentAssignment, _instancePartitionsMap, null, null, new RebalanceConfig()); assertEquals(newAssignment.get(SEGMENT_NAME).size(), NUM_INSTANCES - 1); } diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/strategy/BalancedNumSegmentAssignmentStrategyTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/strategy/BalancedNumSegmentAssignmentStrategyTest.java index 277043230fd..c62f03fef21 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/strategy/BalancedNumSegmentAssignmentStrategyTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/strategy/BalancedNumSegmentAssignmentStrategyTest.java @@ -24,19 +24,17 @@ import java.util.List; import java.util.Map; import java.util.TreeMap; -import org.apache.commons.configuration.BaseConfiguration; -import org.apache.commons.configuration.Configuration; import org.apache.pinot.common.assignment.InstancePartitions; import org.apache.pinot.controller.helix.core.assignment.segment.OfflineSegmentAssignment; import org.apache.pinot.controller.helix.core.assignment.segment.SegmentAssignment; import org.apache.pinot.controller.helix.core.assignment.segment.SegmentAssignmentFactory; import org.apache.pinot.controller.helix.core.assignment.segment.SegmentAssignmentTestUtils; import org.apache.pinot.controller.helix.core.assignment.segment.SegmentAssignmentUtils; +import org.apache.pinot.controller.helix.core.rebalance.RebalanceConfig; import org.apache.pinot.spi.config.table.TableConfig; import org.apache.pinot.spi.config.table.TableType; import org.apache.pinot.spi.config.table.assignment.InstancePartitionsType; import org.apache.pinot.spi.utils.CommonConstants.Helix.StateModel.SegmentStateModel; -import org.apache.pinot.spi.utils.RebalanceConfigConstants; import org.apache.pinot.spi.utils.builder.TableConfigBuilder; import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; @@ -129,8 +127,8 @@ public void testTableBalanced() { Arrays.fill(expectedNumSegmentsAssignedPerInstance, numSegmentsPerInstance); assertEquals(numSegmentsAssignedPerInstance, expectedNumSegmentsAssignedPerInstance); // Current assignment should already be balanced - assertEquals(_segmentAssignment - .rebalanceTable(currentAssignment, _instancePartitionsMap, null, null, new BaseConfiguration()), + assertEquals( + _segmentAssignment.rebalanceTable(currentAssignment, _instancePartitionsMap, null, null, new RebalanceConfig()), currentAssignment); } @@ -145,8 +143,8 @@ public void testBootstrapTable() { } // Bootstrap table should reassign all segments based on their alphabetical order - Configuration rebalanceConfig = new BaseConfiguration(); - rebalanceConfig.setProperty(RebalanceConfigConstants.BOOTSTRAP, true); + RebalanceConfig rebalanceConfig = new RebalanceConfig(); + rebalanceConfig.setBootstrap(true); Map> newAssignment = _segmentAssignment.rebalanceTable(currentAssignment, _instancePartitionsMap, null, null, rebalanceConfig); assertEquals(newAssignment.size(), NUM_SEGMENTS); diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/strategy/ReplicaGroupSegmentAssignmentStrategyTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/strategy/ReplicaGroupSegmentAssignmentStrategyTest.java index e944a3ab2f0..ea4c88774b4 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/strategy/ReplicaGroupSegmentAssignmentStrategyTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/strategy/ReplicaGroupSegmentAssignmentStrategyTest.java @@ -25,8 +25,6 @@ import java.util.List; import java.util.Map; import java.util.TreeMap; -import org.apache.commons.configuration.BaseConfiguration; -import org.apache.commons.configuration.Configuration; import org.apache.helix.HelixManager; import org.apache.helix.store.zk.ZkHelixPropertyStore; import org.apache.helix.zookeeper.datamodel.ZNRecord; @@ -39,6 +37,7 @@ import org.apache.pinot.controller.helix.core.assignment.segment.SegmentAssignmentFactory; import org.apache.pinot.controller.helix.core.assignment.segment.SegmentAssignmentTestUtils; import org.apache.pinot.controller.helix.core.assignment.segment.SegmentAssignmentUtils; +import org.apache.pinot.controller.helix.core.rebalance.RebalanceConfig; import org.apache.pinot.segment.spi.partition.metadata.ColumnPartitionMetadata; import org.apache.pinot.spi.config.table.ReplicaGroupStrategyConfig; import org.apache.pinot.spi.config.table.TableConfig; @@ -46,7 +45,6 @@ import org.apache.pinot.spi.config.table.assignment.InstancePartitionsType; import org.apache.pinot.spi.utils.CommonConstants.Helix.StateModel.SegmentStateModel; import org.apache.pinot.spi.utils.CommonConstants.Segment.AssignmentStrategy; -import org.apache.pinot.spi.utils.RebalanceConfigConstants; import org.apache.pinot.spi.utils.builder.TableConfigBuilder; import org.apache.pinot.spi.utils.builder.TableNameBuilder; import org.testng.annotations.BeforeClass; @@ -260,10 +258,9 @@ public void testTableBalancedWithoutPartition() { Arrays.fill(expectedNumSegmentsAssignedPerInstance, numSegmentsPerInstance); assertEquals(numSegmentsAssignedPerInstance, expectedNumSegmentsAssignedPerInstance); // Current assignment should already be balanced - assertEquals(_segmentAssignmentWithoutPartition - .rebalanceTable(currentAssignment, _instancePartitionsMapWithoutPartition, null, null, - new BaseConfiguration()), - currentAssignment); + assertEquals( + _segmentAssignmentWithoutPartition.rebalanceTable(currentAssignment, _instancePartitionsMapWithoutPartition, + null, null, new RebalanceConfig()), currentAssignment); } @Test @@ -288,10 +285,9 @@ public void testTableBalancedWithPartition() { Arrays.fill(expectedNumSegmentsAssignedPerInstance, numSegmentsPerInstance); assertEquals(numSegmentsAssignedPerInstance, expectedNumSegmentsAssignedPerInstance); // Current assignment should already be balanced - assertEquals(_segmentAssignmentWithPartition - .rebalanceTable(currentAssignment, _instancePartitionsMapWithPartition, null, null, - new BaseConfiguration()), - currentAssignment); + assertEquals( + _segmentAssignmentWithPartition.rebalanceTable(currentAssignment, _instancePartitionsMapWithPartition, null, + null, new RebalanceConfig()), currentAssignment); } @Test @@ -305,10 +301,11 @@ public void testBootstrapTableWithoutPartition() { } // Bootstrap table should reassign all segments based on their alphabetical order - Configuration rebalanceConfig = new BaseConfiguration(); - rebalanceConfig.setProperty(RebalanceConfigConstants.BOOTSTRAP, true); - Map> newAssignment = _segmentAssignmentWithoutPartition - .rebalanceTable(currentAssignment, _instancePartitionsMapWithoutPartition, null, null, rebalanceConfig); + RebalanceConfig rebalanceConfig = new RebalanceConfig(); + rebalanceConfig.setBootstrap(true); + Map> newAssignment = + _segmentAssignmentWithoutPartition.rebalanceTable(currentAssignment, _instancePartitionsMapWithoutPartition, + null, null, rebalanceConfig); assertEquals(newAssignment.size(), NUM_SEGMENTS); List sortedSegments = new ArrayList<>(SEGMENTS); sortedSegments.sort(null); @@ -328,10 +325,11 @@ public void testBootstrapTableWithPartition() { } // Bootstrap table should reassign all segments based on their alphabetical order within the partition - Configuration rebalanceConfig = new BaseConfiguration(); - rebalanceConfig.setProperty(RebalanceConfigConstants.BOOTSTRAP, true); - Map> newAssignment = _segmentAssignmentWithPartition - .rebalanceTable(currentAssignment, _instancePartitionsMapWithPartition, null, null, rebalanceConfig); + RebalanceConfig rebalanceConfig = new RebalanceConfig(); + rebalanceConfig.setBootstrap(true); + Map> newAssignment = + _segmentAssignmentWithPartition.rebalanceTable(currentAssignment, _instancePartitionsMapWithPartition, null, + null, rebalanceConfig); assertEquals(newAssignment.size(), NUM_SEGMENTS); int numSegmentsPerPartition = NUM_SEGMENTS / NUM_PARTITIONS; String[][] partitionIdToSegmentsMap = new String[NUM_PARTITIONS][numSegmentsPerPartition]; @@ -363,9 +361,9 @@ public void testRebalanceTableWithPartitionColumnAndInstancePartitionsMapWithOne SEGMENTS.forEach(segName -> unbalancedAssignment.put(segName, ImmutableMap .of(instance0, SegmentStateModel.ONLINE, instance1, SegmentStateModel.ONLINE, instance2, SegmentStateModel.ONLINE))); - Map> balancedAssignment = _segmentAssignmentWithPartition - .rebalanceTable(unbalancedAssignment, _instancePartitionsMapWithoutPartition, null, null, - new BaseConfiguration()); + Map> balancedAssignment = + _segmentAssignmentWithPartition.rebalanceTable(unbalancedAssignment, _instancePartitionsMapWithoutPartition, + null, null, new RebalanceConfig()); int[] actualNumSegmentsAssignedPerInstance = SegmentAssignmentUtils.getNumSegmentsAssignedPerInstance(balancedAssignment, INSTANCES); int[] expectedNumSegmentsAssignedPerInstance = new int[NUM_INSTANCES]; @@ -451,13 +449,13 @@ public void testOneReplicaWithPartition() { // Current assignment should already be balanced assertEquals( - segmentAssignment.rebalanceTable(currentAssignment, instancePartitionsMap, null, null, new BaseConfiguration()), + segmentAssignment.rebalanceTable(currentAssignment, instancePartitionsMap, null, null, new RebalanceConfig()), currentAssignment); // Test bootstrap // Bootstrap table should reassign all segments based on their alphabetical order within the partition - Configuration rebalanceConfig = new BaseConfiguration(); - rebalanceConfig.setProperty(RebalanceConfigConstants.BOOTSTRAP, true); + RebalanceConfig rebalanceConfig = new RebalanceConfig(); + rebalanceConfig.setBootstrap(true); Map> newAssignment = segmentAssignment.rebalanceTable(currentAssignment, instancePartitionsMap, null, null, rebalanceConfig); assertEquals(newAssignment.size(), NUM_SEGMENTS); diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerClusterStatelessTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerClusterStatelessTest.java index 8d6da2681a8..72a581d96b0 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerClusterStatelessTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerClusterStatelessTest.java @@ -24,8 +24,6 @@ import java.util.Collections; import java.util.Map; import java.util.concurrent.TimeUnit; -import org.apache.commons.configuration.BaseConfiguration; -import org.apache.commons.configuration.Configuration; import org.apache.pinot.common.assignment.InstancePartitions; import org.apache.pinot.common.assignment.InstancePartitionsUtils; import org.apache.pinot.common.tier.TierFactory; @@ -42,7 +40,6 @@ import org.apache.pinot.spi.config.table.assignment.InstanceTagPoolConfig; import org.apache.pinot.spi.config.tenant.Tenant; import org.apache.pinot.spi.config.tenant.TenantRole; -import org.apache.pinot.spi.utils.RebalanceConfigConstants; import org.apache.pinot.spi.utils.builder.TableConfigBuilder; import org.apache.pinot.spi.utils.builder.TableNameBuilder; import org.testng.annotations.AfterClass; @@ -100,7 +97,7 @@ public void testRebalance() new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).setNumReplicas(NUM_REPLICAS).build(); // Rebalance should fail without creating the table - RebalanceResult rebalanceResult = tableRebalancer.rebalance(tableConfig, new BaseConfiguration()); + RebalanceResult rebalanceResult = tableRebalancer.rebalance(tableConfig, new RebalanceConfig()); assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.FAILED); // Create the table @@ -116,7 +113,7 @@ public void testRebalance() _helixResourceManager.getTableIdealState(OFFLINE_TABLE_NAME).getRecord().getMapFields(); // Rebalance should return NO_OP status - rebalanceResult = tableRebalancer.rebalance(tableConfig, new BaseConfiguration()); + rebalanceResult = tableRebalancer.rebalance(tableConfig, new RebalanceConfig()); assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.NO_OP); // All servers should be assigned to the table @@ -139,8 +136,8 @@ public void testRebalance() } // Rebalance in dry-run mode - Configuration rebalanceConfig = new BaseConfiguration(); - rebalanceConfig.addProperty(RebalanceConfigConstants.DRY_RUN, true); + RebalanceConfig rebalanceConfig = new RebalanceConfig(); + rebalanceConfig.setDryRun(true); rebalanceResult = tableRebalancer.rebalance(tableConfig, rebalanceConfig); assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.DONE); @@ -169,8 +166,8 @@ public void testRebalance() oldSegmentAssignment); // Rebalance with 3 min available replicas should fail as the table only have 3 replicas - rebalanceConfig = new BaseConfiguration(); - rebalanceConfig.addProperty(RebalanceConfigConstants.MIN_REPLICAS_TO_KEEP_UP_FOR_NO_DOWNTIME, 3); + rebalanceConfig = new RebalanceConfig(); + rebalanceConfig.setMinAvailableReplicas(3); rebalanceResult = tableRebalancer.rebalance(tableConfig, rebalanceConfig); assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.FAILED); @@ -179,8 +176,8 @@ public void testRebalance() oldSegmentAssignment); // Rebalance with 2 min available replicas should succeed - rebalanceConfig = new BaseConfiguration(); - rebalanceConfig.addProperty(RebalanceConfigConstants.MIN_REPLICAS_TO_KEEP_UP_FOR_NO_DOWNTIME, 2); + rebalanceConfig = new RebalanceConfig(); + rebalanceConfig.setMinAvailableReplicas(2); rebalanceResult = tableRebalancer.rebalance(tableConfig, rebalanceConfig); assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.DONE); @@ -201,7 +198,7 @@ public void testRebalance() _helixResourceManager.updateTableConfig(tableConfig); // No need to reassign instances because instances should be automatically assigned when updating the table config - rebalanceResult = tableRebalancer.rebalance(tableConfig, new BaseConfiguration()); + rebalanceResult = tableRebalancer.rebalance(tableConfig, new RebalanceConfig()); assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.DONE); // There should be 3 replica-groups, each with 2 servers @@ -248,15 +245,15 @@ public void testRebalance() // Without instances reassignment, the rebalance should return status NO_OP, and the existing instance partitions // should be used - rebalanceResult = tableRebalancer.rebalance(tableConfig, new BaseConfiguration()); + rebalanceResult = tableRebalancer.rebalance(tableConfig, new RebalanceConfig()); assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.NO_OP); assertEquals(rebalanceResult.getInstanceAssignment(), instanceAssignment); assertEquals(rebalanceResult.getSegmentAssignment(), newSegmentAssignment); // With instances reassignment, the rebalance should return status DONE, the existing instance partitions should be // removed, and the default instance partitions should be used - rebalanceConfig = new BaseConfiguration(); - rebalanceConfig.addProperty(RebalanceConfigConstants.REASSIGN_INSTANCES, true); + rebalanceConfig = new RebalanceConfig(); + rebalanceConfig.setReassignInstances(true); rebalanceResult = tableRebalancer.rebalance(tableConfig, rebalanceConfig); assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.DONE); assertNull(InstancePartitionsUtils.fetchInstancePartitions(_propertyStore, @@ -283,8 +280,8 @@ public void testRebalance() } // Rebalance with downtime should succeed - rebalanceConfig = new BaseConfiguration(); - rebalanceConfig.addProperty(RebalanceConfigConstants.DOWNTIME, true); + rebalanceConfig = new RebalanceConfig(); + rebalanceConfig.setDowntime(true); rebalanceResult = tableRebalancer.rebalance(tableConfig, rebalanceConfig); assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.DONE); @@ -348,7 +345,7 @@ public void testRebalanceWithTiers() _helixResourceManager.getTableIdealState(OFFLINE_TIERED_TABLE_NAME).getRecord().getMapFields(); TableRebalancer tableRebalancer = new TableRebalancer(_helixManager); - RebalanceResult rebalanceResult = tableRebalancer.rebalance(tableConfig, new BaseConfiguration()); + RebalanceResult rebalanceResult = tableRebalancer.rebalance(tableConfig, new RebalanceConfig()); assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.NO_OP); // Segment assignment should not change assertEquals(rebalanceResult.getSegmentAssignment(), oldSegmentAssignment); @@ -364,7 +361,7 @@ public void testRebalanceWithTiers() _helixResourceManager.createServerTenant(new Tenant(TenantRole.SERVER, TIER_B_NAME, 3, 3, 0)); // rebalance is NOOP and no change in assignment caused by new instances - rebalanceResult = tableRebalancer.rebalance(tableConfig, new BaseConfiguration()); + rebalanceResult = tableRebalancer.rebalance(tableConfig, new RebalanceConfig()); assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.NO_OP); // Segment assignment should not change assertEquals(rebalanceResult.getSegmentAssignment(), oldSegmentAssignment); @@ -382,7 +379,7 @@ public void testRebalanceWithTiers() _helixResourceManager.updateTableConfig(tableConfig); // rebalance should change assignment - rebalanceResult = tableRebalancer.rebalance(tableConfig, new BaseConfiguration()); + rebalanceResult = tableRebalancer.rebalance(tableConfig, new RebalanceConfig()); assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.DONE); // check that segments have moved to tiers @@ -438,7 +435,7 @@ public void testRebalanceWithTiersAndInstanceAssignments() _helixResourceManager.getTableIdealState(OFFLINE_TIERED_TABLE_NAME).getRecord().getMapFields(); TableRebalancer tableRebalancer = new TableRebalancer(_helixManager); - RebalanceResult rebalanceResult = tableRebalancer.rebalance(tableConfig, new BaseConfiguration()); + RebalanceResult rebalanceResult = tableRebalancer.rebalance(tableConfig, new RebalanceConfig()); assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.NO_OP); // Segment assignment should not change assertEquals(rebalanceResult.getSegmentAssignment(), oldSegmentAssignment); @@ -450,7 +447,7 @@ public void testRebalanceWithTiersAndInstanceAssignments() } _helixResourceManager.createServerTenant(new Tenant(TenantRole.SERVER, "replicaAssignment" + TIER_A_NAME, 6, 6, 0)); // rebalance is NOOP and no change in assignment caused by new instances - rebalanceResult = tableRebalancer.rebalance(tableConfig, new BaseConfiguration()); + rebalanceResult = tableRebalancer.rebalance(tableConfig, new RebalanceConfig()); assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.NO_OP); // Segment assignment should not change assertEquals(rebalanceResult.getSegmentAssignment(), oldSegmentAssignment); @@ -462,7 +459,7 @@ public void testRebalanceWithTiersAndInstanceAssignments() _helixResourceManager.updateTableConfig(tableConfig); // rebalance should change assignment - rebalanceResult = tableRebalancer.rebalance(tableConfig, new BaseConfiguration()); + rebalanceResult = tableRebalancer.rebalance(tableConfig, new RebalanceConfig()); assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.DONE); // check that segments have moved to tier a @@ -484,7 +481,7 @@ public void testRebalanceWithTiersAndInstanceAssignments() new InstanceAssignmentConfig(tagPoolConfig, null, replicaGroupPartitionConfig))); _helixResourceManager.updateTableConfig(tableConfig); - rebalanceResult = tableRebalancer.rebalance(tableConfig, new BaseConfiguration()); + rebalanceResult = tableRebalancer.rebalance(tableConfig, new RebalanceConfig()); assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.DONE); assertTrue(rebalanceResult.getTierInstanceAssignment().containsKey(TIER_A_NAME)); diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/tenant/TenantRebalancerTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/tenant/TenantRebalancerTest.java index a4237fee3fc..1649d9a922f 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/tenant/TenantRebalancerTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/tenant/TenantRebalancerTest.java @@ -29,13 +29,13 @@ import org.apache.pinot.common.metadata.controllerjob.ControllerJobType; import org.apache.pinot.common.utils.config.TagNameUtils; import org.apache.pinot.controller.helix.ControllerTest; +import org.apache.pinot.controller.helix.core.rebalance.RebalanceJobConstants; import org.apache.pinot.controller.helix.core.rebalance.RebalanceResult; import org.apache.pinot.controller.utils.SegmentMetadataMockUtils; import org.apache.pinot.spi.config.table.TableConfig; import org.apache.pinot.spi.config.table.TableType; import org.apache.pinot.spi.config.table.assignment.InstancePartitionsType; import org.apache.pinot.spi.utils.JsonUtils; -import org.apache.pinot.spi.utils.RebalanceConfigConstants; import org.apache.pinot.spi.utils.builder.TableConfigBuilder; import org.apache.pinot.spi.utils.builder.TableNameBuilder; import org.testng.annotations.AfterClass; @@ -93,10 +93,10 @@ public void testRebalance() _helixResourceManager.getTableIdealState(OFFLINE_TABLE_NAME_B).getRecord().getMapFields(); // rebalance the tables on test tenant - TenantRebalanceContext context = new TenantRebalanceContext(); - context.setTenantName(TENANT_NAME); - context.setVerboseResult(true); - TenantRebalanceResult result = tenantRebalancer.rebalance(context); + TenantRebalanceConfig config = new TenantRebalanceConfig(); + config.setTenantName(TENANT_NAME); + config.setVerboseResult(true); + TenantRebalanceResult result = tenantRebalancer.rebalance(config); RebalanceResult rebalanceResult = result.getRebalanceTableResults().get(OFFLINE_TABLE_NAME_B); Map> rebalancedAssignment = rebalanceResult.getSegmentAssignment(); // assignment should not change, with a NO_OP status as no now server is added to test tenant @@ -104,8 +104,8 @@ public void testRebalance() assertEquals(oldSegmentAssignment, rebalancedAssignment); // rebalance the tables on default tenant - context.setTenantName(DEFAULT_TENANT_NAME); - result = tenantRebalancer.rebalance(context); + config.setTenantName(DEFAULT_TENANT_NAME); + result = tenantRebalancer.rebalance(config); // rebalancing default tenant should distribute the segment of table A over 6 servers rebalanceResult = result.getRebalanceTableResults().get(OFFLINE_TABLE_NAME_A); InstancePartitions partitions = rebalanceResult.getInstanceAssignment().get(InstancePartitionsType.OFFLINE); @@ -148,7 +148,8 @@ private TenantRebalanceProgressStats getProgress(String jobId) if (controllerJobZKMetadata == null) { return null; } - return JsonUtils.stringToObject(controllerJobZKMetadata.get(RebalanceConfigConstants.REBALANCE_PROGRESS_STATS), + return JsonUtils.stringToObject( + controllerJobZKMetadata.get(RebalanceJobConstants.JOB_METADATA_KEY_REBALANCE_PROGRESS_STATS), TenantRebalanceProgressStats.class); } diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/RebalanceConfigConstants.java b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/RebalanceConfigConstants.java deleted file mode 100644 index 37cd4f3f254..00000000000 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/RebalanceConfigConstants.java +++ /dev/null @@ -1,76 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pinot.spi.utils; - -/** - * Constants for rebalance config properties - */ -public class RebalanceConfigConstants { - private RebalanceConfigConstants() { - } - - // Unique Id for rebalance - public static final String JOB_ID = "jobId"; - - // Progress of the Rebalance operartion - public static final String REBALANCE_PROGRESS_STATS = "REBALANCE_PROGRESS_STATS"; - - // Whether to rebalance table in dry-run mode - public static final String DRY_RUN = "dryRun"; - public static final boolean DEFAULT_DRY_RUN = false; - - // Whether to reassign instances before reassigning segments - public static final String REASSIGN_INSTANCES = "reassignInstances"; - public static final boolean DEFAULT_REASSIGN_INSTANCES = false; - - // Whether to reassign CONSUMING segments - public static final String INCLUDE_CONSUMING = "includeConsuming"; - public static final boolean DEFAULT_INCLUDE_CONSUMING = false; - - // Whether to rebalance table in bootstrap mode (regardless of minimum segment movement, reassign all segments in a - // round-robin fashion as if adding new segments to an empty table) - public static final String BOOTSTRAP = "bootstrap"; - public static final boolean DEFAULT_BOOTSTRAP = false; - - // Whether to allow downtime for the rebalance - public static final String DOWNTIME = "downtime"; - public static final boolean DEFAULT_DOWNTIME = false; - - // For no-downtime rebalance, minimum number of replicas to keep alive during rebalance, or maximum number of replicas - // allowed to be unavailable if value is negative - public static final String MIN_REPLICAS_TO_KEEP_UP_FOR_NO_DOWNTIME = "minReplicasToKeepUpForNoDowntime"; - public static final int DEFAULT_MIN_REPLICAS_TO_KEEP_UP_FOR_NO_DOWNTIME = 1; - - // Whether to use best-efforts to rebalance (not fail the rebalance when the no-downtime contract cannot be achieved) - // When using best-efforts to rebalance, the following scenarios won't fail the rebalance (will log warnings instead): - // - Segment falls into ERROR state in ExternalView -> count ERROR state as good state - // - ExternalView has not converged within the maximum wait time -> continue to the next stage - public static final String BEST_EFFORTS = "bestEfforts"; - public static final boolean DEFAULT_BEST_EFFORTS = false; - - // The check on external view can be very costly when the table has very large ideal and external states, i.e. when - // having a huge number of segments. These two configs help reduce the cpu load on controllers, e.g. by doing the - // check less frequently and bail out sooner to rebalance at best effort if configured so. - public static final String EXTERNAL_VIEW_CHECK_INTERVAL_IN_MS = "externalViewCheckIntervalInMs"; - public static final long DEFAULT_EXTERNAL_VIEW_CHECK_INTERVAL_IN_MS = 1_000L; // 1 second - public static final String EXTERNAL_VIEW_STABILIZATION_TIMEOUT_IN_MS = "externalViewStabilizationTimeoutInMs"; - public static final long DEFAULT_EXTERNAL_VIEW_STABILIZATION_TIMEOUT_IN_MS = 60 * 60_000L; // 1 hour - public static final String UPDATE_TARGET_TIER = "updateTargetTier"; - public static final boolean DEFAULT_UPDATE_TARGET_TIER = false; -} diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/ControllerRequestURLBuilder.java b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/ControllerRequestURLBuilder.java index 3a572932882..a4c85505dc6 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/ControllerRequestURLBuilder.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/ControllerRequestURLBuilder.java @@ -28,7 +28,6 @@ import org.apache.commons.lang3.StringUtils; import org.apache.pinot.spi.config.table.TableType; import org.apache.pinot.spi.config.table.assignment.InstancePartitionsType; -import org.apache.pinot.spi.utils.RebalanceConfigConstants; import org.apache.pinot.spi.utils.StringUtil; @@ -199,29 +198,26 @@ public String forUpdateTableConfig(String tableName) { } public String forTableRebalance(String tableName, String tableType) { - return forTableRebalance(tableName, tableType, RebalanceConfigConstants.DEFAULT_DRY_RUN, - RebalanceConfigConstants.DEFAULT_REASSIGN_INSTANCES, RebalanceConfigConstants.DEFAULT_INCLUDE_CONSUMING, - RebalanceConfigConstants.DEFAULT_DOWNTIME, - RebalanceConfigConstants.DEFAULT_MIN_REPLICAS_TO_KEEP_UP_FOR_NO_DOWNTIME); + return forTableRebalance(tableName, tableType, false, false, false, false, 1); } public String forTableRebalance(String tableName, String tableType, boolean dryRun, boolean reassignInstances, boolean includeConsuming, boolean downtime, int minAvailableReplicas) { StringBuilder stringBuilder = new StringBuilder(StringUtil.join("/", _baseUrl, "tables", tableName, "rebalance?type=" + tableType)); - if (dryRun != RebalanceConfigConstants.DEFAULT_DRY_RUN) { + if (dryRun) { stringBuilder.append("&dryRun=").append(dryRun); } - if (reassignInstances != RebalanceConfigConstants.DEFAULT_REASSIGN_INSTANCES) { + if (reassignInstances) { stringBuilder.append("&reassignInstances=").append(reassignInstances); } - if (includeConsuming != RebalanceConfigConstants.DEFAULT_INCLUDE_CONSUMING) { + if (includeConsuming) { stringBuilder.append("&includeConsuming=").append(includeConsuming); } - if (downtime != RebalanceConfigConstants.DEFAULT_DOWNTIME) { + if (downtime) { stringBuilder.append("&downtime=").append(downtime); } - if (minAvailableReplicas != RebalanceConfigConstants.DEFAULT_MIN_REPLICAS_TO_KEEP_UP_FOR_NO_DOWNTIME) { + if (minAvailableReplicas != 1) { stringBuilder.append("&minAvailableReplicas=").append(minAvailableReplicas); } return stringBuilder.toString(); diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/PinotTableRebalancer.java b/pinot-tools/src/main/java/org/apache/pinot/tools/PinotTableRebalancer.java index 6d1d58f7435..0a682427037 100644 --- a/pinot-tools/src/main/java/org/apache/pinot/tools/PinotTableRebalancer.java +++ b/pinot-tools/src/main/java/org/apache/pinot/tools/PinotTableRebalancer.java @@ -19,39 +19,33 @@ package org.apache.pinot.tools; import com.google.common.base.Preconditions; -import org.apache.commons.configuration.BaseConfiguration; -import org.apache.commons.configuration.Configuration; import org.apache.pinot.common.metadata.ZKMetadataProvider; +import org.apache.pinot.controller.helix.core.rebalance.RebalanceConfig; import org.apache.pinot.controller.helix.core.rebalance.RebalanceResult; import org.apache.pinot.controller.helix.core.rebalance.TableRebalancer; import org.apache.pinot.spi.config.table.TableConfig; -import org.apache.pinot.spi.utils.RebalanceConfigConstants; /** * Helper class for pinot-admin tool's RebalanceTable command. */ public class PinotTableRebalancer extends PinotZKChanger { - private final Configuration _rebalanceConfig = new BaseConfiguration(); + private final RebalanceConfig _rebalanceConfig = new RebalanceConfig(); public PinotTableRebalancer(String zkAddress, String clusterName, boolean dryRun, boolean reassignInstances, boolean includeConsuming, boolean bootstrap, boolean downtime, int minReplicasToKeepUpForNoDowntime, boolean bestEffort, long externalViewCheckIntervalInMs, long externalViewStabilizationTimeoutInMs) { super(zkAddress, clusterName); - _rebalanceConfig.addProperty(RebalanceConfigConstants.DRY_RUN, dryRun); - _rebalanceConfig.addProperty(RebalanceConfigConstants.REASSIGN_INSTANCES, reassignInstances); - _rebalanceConfig.addProperty(RebalanceConfigConstants.INCLUDE_CONSUMING, includeConsuming); - _rebalanceConfig.addProperty(RebalanceConfigConstants.BOOTSTRAP, bootstrap); - _rebalanceConfig.addProperty(RebalanceConfigConstants.DOWNTIME, downtime); - _rebalanceConfig.addProperty(RebalanceConfigConstants.MIN_REPLICAS_TO_KEEP_UP_FOR_NO_DOWNTIME, - minReplicasToKeepUpForNoDowntime); - _rebalanceConfig.addProperty(RebalanceConfigConstants.BEST_EFFORTS, bestEffort); - _rebalanceConfig.addProperty(RebalanceConfigConstants.EXTERNAL_VIEW_CHECK_INTERVAL_IN_MS, - externalViewCheckIntervalInMs); - _rebalanceConfig.addProperty(RebalanceConfigConstants.EXTERNAL_VIEW_STABILIZATION_TIMEOUT_IN_MS, - externalViewStabilizationTimeoutInMs); - _rebalanceConfig.addProperty(RebalanceConfigConstants.JOB_ID, - TableRebalancer.createUniqueRebalanceJobIdentifier()); + _rebalanceConfig.setDryRun(dryRun); + _rebalanceConfig.setReassignInstances(reassignInstances); + _rebalanceConfig.setIncludeConsuming(includeConsuming); + _rebalanceConfig.setBootstrap(bootstrap); + _rebalanceConfig.setDowntime(downtime); + _rebalanceConfig.setMinAvailableReplicas(minReplicasToKeepUpForNoDowntime); + _rebalanceConfig.setBestEfforts(bestEffort); + _rebalanceConfig.setExternalViewCheckIntervalInMs(externalViewCheckIntervalInMs); + _rebalanceConfig.setExternalViewStabilizationTimeoutInMs(externalViewStabilizationTimeoutInMs); + _rebalanceConfig.setJobId(TableRebalancer.createUniqueRebalanceJobIdentifier()); } public RebalanceResult rebalance(String tableNameWithType) { diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/RebalanceTableCommand.java b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/RebalanceTableCommand.java index 258c66cea23..5b4c3260d4e 100644 --- a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/RebalanceTableCommand.java +++ b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/RebalanceTableCommand.java @@ -18,9 +18,9 @@ */ package org.apache.pinot.tools.admin.command; +import org.apache.pinot.controller.helix.core.rebalance.RebalanceConfig; import org.apache.pinot.controller.helix.core.rebalance.RebalanceResult; import org.apache.pinot.spi.utils.JsonUtils; -import org.apache.pinot.spi.utils.RebalanceConfigConstants; import org.apache.pinot.tools.Command; import org.apache.pinot.tools.PinotTableRebalancer; import org.slf4j.Logger; @@ -80,12 +80,12 @@ public class RebalanceTableCommand extends AbstractBaseAdminCommand implements C @CommandLine.Option(names = {"-externalViewCheckIntervalInMs"}, description = "How often to check if external view converges with ideal view") - private long _externalViewCheckIntervalInMs = RebalanceConfigConstants.DEFAULT_EXTERNAL_VIEW_CHECK_INTERVAL_IN_MS; + private long _externalViewCheckIntervalInMs = RebalanceConfig.DEFAULT_EXTERNAL_VIEW_CHECK_INTERVAL_IN_MS; @CommandLine.Option(names = {"-externalViewStabilizationTimeoutInMs"}, description = "How long to wait till external view converges with ideal view") private long _externalViewStabilizationTimeoutInMs = - RebalanceConfigConstants.DEFAULT_EXTERNAL_VIEW_STABILIZATION_TIMEOUT_IN_MS; + RebalanceConfig.DEFAULT_EXTERNAL_VIEW_STABILIZATION_TIMEOUT_IN_MS; @CommandLine.Option(names = {"-help", "-h", "--h", "--help"}, help = true, description = "Print this message") private boolean _help = false;