Skip to content

Commit

Permalink
fix tests
Browse files Browse the repository at this point in the history
  • Loading branch information
klsince committed Oct 3, 2023
1 parent 8a0f6ea commit fe411ba
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,23 @@ public void setJobId(String jobId) {
_jobId = jobId;
}

// Helper method to deprecate the use of Configuration to keep rebalance configs.
public static RebalanceConfig copy(RebalanceConfig cfg) {
RebalanceConfig rc = new RebalanceConfig();
rc._dryRun = cfg._dryRun;
rc._reassignInstances = cfg._reassignInstances;
rc._includeConsuming = cfg._includeConsuming;
rc._bootstrap = cfg._bootstrap;
rc._downtime = cfg._downtime;
rc._minAvailableReplicas = cfg._minAvailableReplicas;
rc._bestEfforts = cfg._bestEfforts;
rc._externalViewCheckIntervalInMs = cfg._externalViewCheckIntervalInMs;
rc._externalViewStabilizationTimeoutInMs = cfg._externalViewStabilizationTimeoutInMs;
rc._updateTargetTier = cfg._updateTargetTier;
rc._jobId = cfg._jobId;
return rc;
}

// Helper method to help deprecate the use of Configuration to keep rebalance configs.
public static RebalanceConfig fromConfiguration(Configuration cfg) {
RebalanceConfig rc = new RebalanceConfig();
rc.setDryRun(cfg.getBoolean(RebalanceConfigConstants.DRY_RUN, RebalanceConfigConstants.DEFAULT_DRY_RUN));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,10 @@ public TenantRebalanceResult rebalance(TenantRebalanceConfig config) {
Set<String> tables = getTenantTables(config.getTenantName());
tables.forEach(table -> {
try {
config.setJobId(createUniqueRebalanceJobIdentifier());
config.setDryRun(true);
rebalanceResult.put(table, _pinotHelixResourceManager.rebalanceTable(table, config, false));
RebalanceConfig rebalanceConfig = RebalanceConfig.copy(config);
rebalanceConfig.setJobId(createUniqueRebalanceJobIdentifier());
rebalanceConfig.setDryRun(true);
rebalanceResult.put(table, _pinotHelixResourceManager.rebalanceTable(table, rebalanceConfig, false));
} catch (TableNotFoundException exception) {
rebalanceResult.put(table, new RebalanceResult(null, RebalanceResult.Status.FAILED, exception.getMessage(),
null, null, null));
Expand Down Expand Up @@ -129,21 +130,23 @@ public TenantRebalanceResult rebalance(TenantRebalanceConfig config) {
if (table == null) {
break;
}
config.setDryRun(false);
config.setJobId(rebalanceResult.get(table).getJobId());
rebalanceTable(table, config, observer);
RebalanceConfig rebalanceConfig = RebalanceConfig.copy(config);
rebalanceConfig.setDryRun(false);
rebalanceConfig.setJobId(rebalanceResult.get(table).getJobId());
rebalanceTable(table, rebalanceConfig, observer);
}
// Last parallel thread to finish the table rebalance job will pick up the
// sequential table rebalance execution
if (activeThreads.decrementAndGet() == 0) {
config.setDryRun(false);
RebalanceConfig rebalanceConfig = RebalanceConfig.copy(config);
rebalanceConfig.setDryRun(false);
while (true) {
String table = sequentialQueue.pollFirst();
if (table == null) {
break;
}
config.setJobId(rebalanceResult.get(table).getJobId());
rebalanceTable(table, config, observer);
rebalanceConfig.setJobId(rebalanceResult.get(table).getJobId());
rebalanceTable(table, rebalanceConfig, observer);
}
observer.onSuccess(String.format("Successfully rebalanced tenant %s.", config.getTenantName()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.TreeSet;
import org.apache.pinot.common.assignment.InstancePartitions;
import org.apache.pinot.common.assignment.InstancePartitionsUtils;
import org.apache.pinot.common.tier.PinotServerTierStorage;
Expand Down Expand Up @@ -212,7 +213,10 @@ public void testTableBalanced() {
@Test
public void testBootstrapTable() {
Map<String, Map<String, String>> currentAssignment = new TreeMap<>();
for (String segmentName : SEGMENTS) {
// The list of segments are segment_0, segment_1, ... segment_10, ...; but TreeMap sorts segments as segment_0,
// segment_1, segment_10, ... segment_2, ... So to make bootstrap generate same assignment as assigning each
// segment separately, we need to process the segments in the same order.
for (String segmentName : new TreeSet<>(SEGMENTS)) {
List<String> instancesAssigned =
_segmentAssignment.assignSegment(segmentName, currentAssignment, _instancePartitionsMap);
currentAssignment.put(segmentName,
Expand Down

0 comments on commit fe411ba

Please sign in to comment.