Skip to content

Commit

Permalink
cr
Browse files Browse the repository at this point in the history
  • Loading branch information
klsince committed Oct 4, 2023
1 parent 0e21363 commit 6b3bbd4
Show file tree
Hide file tree
Showing 10 changed files with 34 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.commons.lang3.StringUtils;
import org.apache.helix.controller.rebalancer.strategy.AutoRebalanceStrategy;
import org.apache.pinot.common.protocols.SegmentCompletionProtocol;
import org.apache.pinot.controller.helix.core.rebalance.RebalanceConfig;
import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.filesystem.LocalPinotFS;
import org.apache.pinot.spi.utils.CommonConstants;
Expand Down Expand Up @@ -960,12 +961,13 @@ public boolean enableSegmentRelocatorLocalTierMigration() {
}

public long getSegmentRelocatorExternalViewCheckIntervalInMs() {
return getProperty(ControllerPeriodicTasksConf.SEGMENT_RELOCATOR_EXTERNAL_VIEW_CHECK_INTERVAL_IN_MS, 1000L);
return getProperty(ControllerPeriodicTasksConf.SEGMENT_RELOCATOR_EXTERNAL_VIEW_CHECK_INTERVAL_IN_MS,
RebalanceConfig.DEFAULT_EXTERNAL_VIEW_CHECK_INTERVAL_IN_MS);
}

public long getSegmentRelocatorExternalViewStabilizationTimeoutInMs() {
return getProperty(ControllerPeriodicTasksConf.SEGMENT_RELOCATOR_EXTERNAL_VIEW_STABILIZATION_TIMEOUT_IN_MS,
3600000L);
RebalanceConfig.DEFAULT_EXTERNAL_VIEW_STABILIZATION_TIMEOUT_IN_MS);
}

public boolean isSegmentRelocatorRebalanceTablesSequentially() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -741,9 +741,9 @@ public ServerRebalanceJobStatusResponse rebalanceStatus(
throw new ControllerApplicationException(LOGGER, "Failed to find controller job id: " + jobId,
Response.Status.NOT_FOUND);
}
TableRebalanceProgressStats tableRebalanceProgressStats =
JsonUtils.stringToObject(controllerJobZKMetadata.get(RebalanceJobConstants.JOB_STATS_KEY_REBALANCE_PROGRESS),
TableRebalanceProgressStats.class);
TableRebalanceProgressStats tableRebalanceProgressStats = JsonUtils.stringToObject(
controllerJobZKMetadata.get(RebalanceJobConstants.JOB_METADATA_KEY_REBALANCE_PROGRESS_STATS),
TableRebalanceProgressStats.class);
long timeSinceStartInSecs = 0L;
if (!RebalanceResult.Status.DONE.toString().equals(tableRebalanceProgressStats.getStatus())) {
timeSinceStartInSecs = (System.currentTimeMillis() - tableRebalanceProgressStats.getStartTimeMs()) / 1000;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -606,9 +606,9 @@ public TenantRebalanceJobStatusResponse rebalanceStatus(
throw new ControllerApplicationException(LOGGER, "Failed to find controller job id: " + jobId,
Response.Status.NOT_FOUND);
}
TenantRebalanceProgressStats tenantRebalanceProgressStats =
JsonUtils.stringToObject(controllerJobZKMetadata.get(RebalanceJobConstants.JOB_STATS_KEY_REBALANCE_PROGRESS),
TenantRebalanceProgressStats.class);
TenantRebalanceProgressStats tenantRebalanceProgressStats = JsonUtils.stringToObject(
controllerJobZKMetadata.get(RebalanceJobConstants.JOB_METADATA_KEY_REBALANCE_PROGRESS_STATS),
TenantRebalanceProgressStats.class);
long timeSinceStartInSecs = tenantRebalanceProgressStats.getTimeToFinishInSeconds();
if (tenantRebalanceProgressStats.getCompletionStatusMsg() == null) {
timeSinceStartInSecs =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,10 @@

@ApiModel
public class RebalanceConfig {
public static final int DEFAULT_MIN_REPLICAS_TO_KEEP_UP_FOR_NO_DOWNTIME = 1;
public static final long DEFAULT_EXTERNAL_VIEW_CHECK_INTERVAL_IN_MS = 1000L; // 1 second
public static final long DEFAULT_EXTERNAL_VIEW_STABILIZATION_TIMEOUT_IN_MS = 3600000L; // 1 hour

// Whether to rebalance table in dry-run mode
@JsonProperty("dryRun")
@ApiModelProperty(example = "false")
Expand Down Expand Up @@ -55,7 +59,7 @@ public class RebalanceConfig {
// allowed to be unavailable if value is negative
@JsonProperty("minAvailableReplicas")
@ApiModelProperty(example = "1")
private int _minAvailableReplicas = 1;
private int _minAvailableReplicas = DEFAULT_MIN_REPLICAS_TO_KEEP_UP_FOR_NO_DOWNTIME;

// Whether to use best-efforts to rebalance (not fail the rebalance when the no-downtime contract cannot be achieved)
// When using best-efforts to rebalance, the following scenarios won't fail the rebalance (will log warnings instead):
Expand All @@ -70,11 +74,11 @@ public class RebalanceConfig {
// check less frequently and bail out sooner to rebalance at best effort if configured so.
@JsonProperty("externalViewCheckIntervalInMs")
@ApiModelProperty(example = "1000")
private long _externalViewCheckIntervalInMs = 1000L;
private long _externalViewCheckIntervalInMs = DEFAULT_EXTERNAL_VIEW_CHECK_INTERVAL_IN_MS;

@JsonProperty("externalViewStabilizationTimeoutInMs")
@ApiModelProperty(example = "3600000")
private long _externalViewStabilizationTimeoutInMs = 3600000L;
private long _externalViewStabilizationTimeoutInMs = DEFAULT_EXTERNAL_VIEW_STABILIZATION_TIMEOUT_IN_MS;

@JsonProperty("updateTargetTier")
@ApiModelProperty(example = "false")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,5 +23,5 @@ private RebalanceJobConstants() {
}

// Progress status of the rebalance operartion
public static final String JOB_STATS_KEY_REBALANCE_PROGRESS = "REBALANCE_PROGRESS_STATS";
public static final String JOB_METADATA_KEY_REBALANCE_PROGRESS_STATS = "REBALANCE_PROGRESS_STATS";
}
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ private void trackStatsInZk() {
jobMetadata.put(CommonConstants.ControllerJob.SUBMISSION_TIME_MS, Long.toString(System.currentTimeMillis()));
jobMetadata.put(CommonConstants.ControllerJob.JOB_TYPE, ControllerJobType.TABLE_REBALANCE.name());
try {
jobMetadata.put(RebalanceJobConstants.JOB_STATS_KEY_REBALANCE_PROGRESS,
jobMetadata.put(RebalanceJobConstants.JOB_METADATA_KEY_REBALANCE_PROGRESS_STATS,
JsonUtils.objectToString(_tableRebalanceProgressStats));
} catch (JsonProcessingException e) {
LOGGER.error("Error serialising rebalance stats to JSON for persisting to ZK {}", _rebalanceJobId, e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
*/
package org.apache.pinot.controller.helix.core.rebalance.tenant;


public interface TenantRebalancer {
TenantRebalanceResult rebalance(TenantRebalanceConfig context);
TenantRebalanceResult rebalance(TenantRebalanceConfig config);
}
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,8 @@ private void trackStatsInZk() {
jobMetadata.put(CommonConstants.ControllerJob.SUBMISSION_TIME_MS, Long.toString(System.currentTimeMillis()));
jobMetadata.put(CommonConstants.ControllerJob.JOB_TYPE, ControllerJobType.TENANT_REBALANCE.name());
try {
jobMetadata.put(RebalanceJobConstants.JOB_STATS_KEY_REBALANCE_PROGRESS, JsonUtils.objectToString(_progressStats));
jobMetadata.put(RebalanceJobConstants.JOB_METADATA_KEY_REBALANCE_PROGRESS_STATS,
JsonUtils.objectToString(_progressStats));
} catch (JsonProcessingException e) {
LOGGER.error("Error serialising rebalance stats to JSON for persisting to ZK {}", _jobId, e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,19 +93,19 @@ public void testRebalance()
_helixResourceManager.getTableIdealState(OFFLINE_TABLE_NAME_B).getRecord().getMapFields();

// rebalance the tables on test tenant
TenantRebalanceConfig context = new TenantRebalanceConfig();
context.setTenantName(TENANT_NAME);
context.setVerboseResult(true);
TenantRebalanceResult result = tenantRebalancer.rebalance(context);
TenantRebalanceConfig config = new TenantRebalanceConfig();
config.setTenantName(TENANT_NAME);
config.setVerboseResult(true);
TenantRebalanceResult result = tenantRebalancer.rebalance(config);
RebalanceResult rebalanceResult = result.getRebalanceTableResults().get(OFFLINE_TABLE_NAME_B);
Map<String, Map<String, String>> rebalancedAssignment = rebalanceResult.getSegmentAssignment();
// assignment should not change, with a NO_OP status as no now server is added to test tenant
assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.NO_OP);
assertEquals(oldSegmentAssignment, rebalancedAssignment);

// rebalance the tables on default tenant
context.setTenantName(DEFAULT_TENANT_NAME);
result = tenantRebalancer.rebalance(context);
config.setTenantName(DEFAULT_TENANT_NAME);
result = tenantRebalancer.rebalance(config);
// rebalancing default tenant should distribute the segment of table A over 6 servers
rebalanceResult = result.getRebalanceTableResults().get(OFFLINE_TABLE_NAME_A);
InstancePartitions partitions = rebalanceResult.getInstanceAssignment().get(InstancePartitionsType.OFFLINE);
Expand Down Expand Up @@ -148,7 +148,8 @@ private TenantRebalanceProgressStats getProgress(String jobId)
if (controllerJobZKMetadata == null) {
return null;
}
return JsonUtils.stringToObject(controllerJobZKMetadata.get(RebalanceJobConstants.JOB_STATS_KEY_REBALANCE_PROGRESS),
return JsonUtils.stringToObject(
controllerJobZKMetadata.get(RebalanceJobConstants.JOB_METADATA_KEY_REBALANCE_PROGRESS_STATS),
TenantRebalanceProgressStats.class);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.pinot.tools.admin.command;

import org.apache.pinot.controller.helix.core.rebalance.RebalanceConfig;
import org.apache.pinot.controller.helix.core.rebalance.RebalanceResult;
import org.apache.pinot.spi.utils.JsonUtils;
import org.apache.pinot.tools.Command;
Expand Down Expand Up @@ -79,11 +80,12 @@ public class RebalanceTableCommand extends AbstractBaseAdminCommand implements C

@CommandLine.Option(names = {"-externalViewCheckIntervalInMs"},
description = "How often to check if external view converges with ideal view")
private long _externalViewCheckIntervalInMs = 1000L;
private long _externalViewCheckIntervalInMs = RebalanceConfig.DEFAULT_EXTERNAL_VIEW_CHECK_INTERVAL_IN_MS;

@CommandLine.Option(names = {"-externalViewStabilizationTimeoutInMs"},
description = "How long to wait till external view converges with ideal view")
private long _externalViewStabilizationTimeoutInMs = 3600000L;
private long _externalViewStabilizationTimeoutInMs =
RebalanceConfig.DEFAULT_EXTERNAL_VIEW_STABILIZATION_TIMEOUT_IN_MS;

@CommandLine.Option(names = {"-help", "-h", "--h", "--help"}, help = true, description = "Print this message")
private boolean _help = false;
Expand Down

0 comments on commit 6b3bbd4

Please sign in to comment.