Skip to content

Commit

Permalink
Add more tests
Browse files Browse the repository at this point in the history
  • Loading branch information
justinlin-linkedin committed Aug 8, 2023
1 parent 1146936 commit d024e70
Show file tree
Hide file tree
Showing 3 changed files with 59 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -797,6 +797,7 @@ private ReplicaId getBootstrapReplicaInFullAuto(String partitionIdStr, DataNodeI
logger.error("No Disk is available to host bootstrap replica. Cannot create the replica.");
return null;
}
boolean decreasedDiskSpace = false;
try {
AmbryPartition mappedPartition =
new AmbryPartition(Long.parseLong(partitionIdStr), clusterMapConfig.clusterMapDefaultPartitionClass,
Expand All @@ -813,6 +814,7 @@ private ReplicaId getBootstrapReplicaInFullAuto(String partitionIdStr, DataNodeI
replicaCapacity = currentPartition.getReplicaIds().get(0).getCapacityInBytes();
}
disk.decreaseAvailableSpaceInBytes(replicaCapacity);
decreasedDiskSpace = true;
AmbryServerReplica replica =
new AmbryServerReplica(clusterMapConfig, currentPartition, disk, true, replicaCapacity,
ReplicaSealStatus.NOT_SEALED);
Expand All @@ -823,7 +825,9 @@ private ReplicaId getBootstrapReplicaInFullAuto(String partitionIdStr, DataNodeI
dataNodeId, e);
// We have decreased the available space on the disk since we thought that it will be used to host replica. Since
// bootstrapping replica failed, increase the available disk space back.
disk.increaseAvailableSpaceInBytes(replicaCapacity);
if (decreasedDiskSpace) {
disk.increaseAvailableSpaceInBytes(replicaCapacity);
}
return null;
}
}
Expand Down Expand Up @@ -856,15 +860,16 @@ private long getReplicaCapacityFromResourceConfig(String partitionIdStr, DataNod
logger.info("Fetching resource config for {} to create bootstrap replica for partition {}", resourceName,
partitionIdStr);
ResourceConfig resourceConfig = configAccessor.getResourceConfig(clusterName, resourceName);
resourceConfigs.putIfAbsent(resourceName, resourceConfig);
if (resourceConfig != null) {
resourceConfigs.putIfAbsent(resourceName, resourceConfig);
}
}
ResourceConfig resourceConfig = resourceConfigs.get(resourceName);
String defaultReplicaCapacityStr = resourceConfig.getSimpleConfig(DEFAULT_REPLICA_CAPACITY_STR);
if (defaultReplicaCapacityStr == null) {
if (resourceConfig == null || resourceConfig.getSimpleConfig(DEFAULT_REPLICA_CAPACITY_STR) == null) {
throw new IllegalArgumentException("Missing default replica capacity from resource " + resourceName
+ " when creating bootstrap replica for partition " + partitionIdStr);
}
return Long.parseLong(defaultReplicaCapacityStr);
return Long.parseLong(resourceConfig.getSimpleConfig(DEFAULT_REPLICA_CAPACITY_STR));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.helix.AccessOption;
import org.apache.helix.ConfigAccessor;
import org.apache.helix.HelixManager;
import org.apache.helix.InstanceType;
import org.apache.helix.model.IdealState;
Expand Down Expand Up @@ -638,16 +639,40 @@ public void getNewReplicaInFullAutoBasicTest() throws Exception {
}

// Set ZNRecord is NULL in Helix PropertyStore
HelixClusterManager helixClusterManager = new HelixClusterManager(clusterMapConfig, selfInstanceName,
new MockHelixManagerFactory(helixCluster, null, null, useAggregatedView), metricRegistry);
MockHelixManagerFactory helixFactory = new MockHelixManagerFactory(helixCluster, null, null, useAggregatedView);
HelixClusterManager helixClusterManager =
new HelixClusterManager(clusterMapConfig, selfInstanceName, helixFactory, metricRegistry);

// Case 1. We are creating a bootstrap replica for an exiting partition
PartitionId partitionOfNewReplica = helixClusterManager.getAllPartitionIds(null).get(0);
long expectedCapacity = partitionOfNewReplica.getReplicaIds().get(0).getCapacityInBytes();
AmbryDataNode ambryDataNode = helixClusterManager.getDataNodeId(currentNode.getHostname(), currentNode.getPort());
// 1. Verify bootstrap replica should be successful
assertNotNull("New replica should be created successfully",
helixClusterManager.getBootstrapReplica(partitionOfNewReplica.toPathString(), ambryDataNode));
ReplicaId bootstrapReplica =
helixClusterManager.getBootstrapReplica(partitionOfNewReplica.toPathString(), ambryDataNode);
assertNotNull("New replica should be created successfully", bootstrapReplica);
assertEquals("Bootstrap replica of existing partition should has peers' capacity", expectedCapacity,
bootstrapReplica.getCapacityInBytes());
assertEquals("There should be exactly one entry in bootstrap replica map", 1,
helixClusterManager.getBootstrapReplicaMap().size());

// Case 2. We are creating a bootstrap replica for a new partition
String newPartitionId = String.valueOf(idealState.getNumPartitions() + 1000);
// Case 2.1 there is no resource config
assertNull("Missing resource config", helixClusterManager.getBootstrapReplica(newPartitionId, ambryDataNode));
// Case 2.1 resource config created
ResourceConfig resourceConfig = new ResourceConfig.Builder(resourceName).build();
resourceConfig.putSimpleConfig(DEFAULT_REPLICA_CAPACITY_STR, String.valueOf(3 * DEFAULT_REPLICA_CAPACITY_IN_BYTES));
ConfigAccessor configAccessor =
helixFactory.getZKHelixManager(clusterMapConfig.clusterMapClusterName, selfInstanceName, InstanceType.SPECTATOR,
parseDcJsonAndPopulateDcInfo(clusterMapConfig.clusterMapDcsZkConnectStrings).get(localDc)
.getZkConnectStrs()
.get(0)).getConfigAccessor();
configAccessor.setResourceConfig(clusterMapConfig.clusterMapClusterName, resourceName, resourceConfig);
newPartitionId = String.valueOf(idealState.getNumPartitions() + 1001);
bootstrapReplica = helixClusterManager.getBootstrapReplica(newPartitionId, ambryDataNode);
assertNotNull(bootstrapReplica);
assertEquals(3 * DEFAULT_REPLICA_CAPACITY_IN_BYTES, bootstrapReplica.getCapacityInBytes());

helixClusterManager.close();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.stream.Collectors;
import org.apache.helix.AccessOption;
Expand Down Expand Up @@ -55,12 +56,15 @@
import org.apache.helix.healthcheck.ParticipantHealthReportCollector;
import org.apache.helix.model.HelixConfigScope;
import org.apache.helix.model.LiveInstance;
import org.apache.helix.model.ResourceConfig;
import org.apache.helix.participant.StateMachineEngine;
import org.apache.helix.spectator.RoutingTableProvider;
import org.apache.helix.store.HelixPropertyStore;
import org.apache.helix.store.zk.ZkHelixPropertyStore;
import org.apache.helix.zookeeper.datamodel.ZNRecord;

import static org.mockito.Mockito.*;


/**
* A mock implementation of the {@link HelixManager} to use in tests.
Expand All @@ -82,6 +86,8 @@ class MockHelixManager implements HelixManager {
private HelixPropertyStore<ZNRecord> helixPropertyStore;
private boolean isAggregatedViewCluster;
private final List<MockHelixAdmin> helixAdminList;
private final ConfigAccessor configAccessor;
private final Map<String, ResourceConfig> resourceConfigs = new ConcurrentHashMap<>();

/**
* Instantiate a MockHelixManager.
Expand Down Expand Up @@ -140,6 +146,18 @@ class MockHelixManager implements HelixManager {
}
}
this.isAggregatedViewCluster = isAggregatedViewCluster;

configAccessor = mock(ConfigAccessor.class);
doAnswer(invocation -> {
String resourceName = invocation.getArgument(1);
return resourceConfigs.get(resourceName);
}).when(configAccessor).getResourceConfig(anyString(), anyString());
doAnswer(invocation -> {
String resourceName = invocation.getArgument(1);
ResourceConfig resourceConfig = invocation.getArgument(2);
resourceConfigs.put(resourceName, resourceConfig);
return null;
}).when(configAccessor).setResourceConfig(anyString(), anyString(), any(ResourceConfig.class));
}

@Override
Expand Down Expand Up @@ -432,7 +450,7 @@ public HelixDataAccessor getHelixDataAccessor() {

@Override
public ConfigAccessor getConfigAccessor() {
throw new IllegalStateException("Not implemented");
return configAccessor;
}

@Override
Expand Down

0 comments on commit d024e70

Please sign in to comment.