Skip to content

Commit

Permalink
Use AA + IncPush cluster level configs
Browse files Browse the repository at this point in the history
  • Loading branch information
sushantmane committed Jul 26, 2023
1 parent 4dee08a commit ae3db5c
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 48 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import static com.linkedin.davinci.store.rocksdb.RocksDBServerConfig.ROCKSDB_PLAIN_TABLE_FORMAT_ENABLED;
import static com.linkedin.venice.ConfigKeys.CONTROLLER_AUTO_MATERIALIZE_DAVINCI_PUSH_STATUS_SYSTEM_STORE;
import static com.linkedin.venice.ConfigKeys.CONTROLLER_AUTO_MATERIALIZE_META_SYSTEM_STORE;
import static com.linkedin.venice.ConfigKeys.ENABLE_ACTIVE_ACTIVE_REPLICATION_AS_DEFAULT_FOR_BATCH_ONLY_STORE;
import static com.linkedin.venice.ConfigKeys.ENABLE_ACTIVE_ACTIVE_REPLICATION_AS_DEFAULT_FOR_HYBRID_STORE;
import static com.linkedin.venice.ConfigKeys.ENABLE_INCREMENTAL_PUSH_FOR_HYBRID_ACTIVE_ACTIVE_USER_STORES;
import static com.linkedin.venice.ConfigKeys.NATIVE_REPLICATION_SOURCE_FABRIC;
Expand All @@ -16,10 +15,11 @@
import static com.linkedin.venice.ConfigKeys.SERVER_SHARED_KAFKA_PRODUCER_ENABLED;
import static com.linkedin.venice.integration.utils.VeniceClusterWrapperConstants.DEFAULT_PARENT_DATA_CENTER_REGION_NAME;
import static com.linkedin.venice.utils.TestUtils.assertCommand;
import static com.linkedin.venice.utils.TestUtils.createAndVerifyStoreInAllRegions;
import static com.linkedin.venice.utils.TestUtils.waitForNonDeterministicAssertion;
import static com.linkedin.venice.utils.TestWriteUtils.STRING_SCHEMA;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertNull;

import com.linkedin.d2.balancer.D2Client;
import com.linkedin.d2.balancer.D2ClientBuilder;
Expand All @@ -44,7 +44,6 @@
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.testng.Assert;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

Expand All @@ -53,7 +52,7 @@ public class IncrementalPushGaE2ETest {
private static final int TEST_TIMEOUT = 5 * Time.MS_PER_MINUTE;
private static final int PUSH_TIMEOUT = TEST_TIMEOUT / 2;

protected static final int NUMBER_OF_CHILD_DATACENTERS = 3;
protected static final int NUMBER_OF_CHILD_DATACENTERS = 2;
protected static final int NUMBER_OF_CLUSTERS = 1;
static final String[] CLUSTER_NAMES =
IntStream.range(0, NUMBER_OF_CLUSTERS).mapToObj(i -> "venice-cluster" + i).toArray(String[]::new);
Expand All @@ -68,15 +67,13 @@ public class IncrementalPushGaE2ETest {
private ControllerClient parentControllerClient;
private ControllerClient dc0Client;
private ControllerClient dc1Client;
private ControllerClient dc2Client;
private List<ControllerClient> dcControllerClientList;

@BeforeClass(alwaysRun = true)
public void setUp() {
/**
* Reduce leader promotion delay to 1 second;
* Create a testing environment with 1 parent fabric and 3 child fabrics;
* Set server and replication factor to 2 to ensure at least 1 leader replica and 1 follower replica;
* Create a testing environment with 1 parent fabric and 2 child fabrics;
*/
serverProperties = new Properties();
serverProperties.put(SERVER_PROMOTION_TO_LEADER_REPLICA_DELAY_SECONDS, 1L);
Expand All @@ -91,7 +88,6 @@ public void setUp() {
controllerProps.put(PARENT_KAFKA_CLUSTER_FABRIC_LIST, DEFAULT_PARENT_DATA_CENTER_REGION_NAME);
controllerProps.put(ENABLE_INCREMENTAL_PUSH_FOR_HYBRID_ACTIVE_ACTIVE_USER_STORES, true);
controllerProps.put(ENABLE_ACTIVE_ACTIVE_REPLICATION_AS_DEFAULT_FOR_HYBRID_STORE, true);
controllerProps.put(ENABLE_ACTIVE_ACTIVE_REPLICATION_AS_DEFAULT_FOR_BATCH_ONLY_STORE, true);
controllerProps.put(PARTICIPANT_MESSAGE_STORE_ENABLED, false);
controllerProps.put(CONTROLLER_AUTO_MATERIALIZE_DAVINCI_PUSH_STATUS_SYSTEM_STORE, false);
controllerProps.put(CONTROLLER_AUTO_MATERIALIZE_META_SYSTEM_STORE, false);
Expand All @@ -101,9 +97,9 @@ public void setUp() {
NUMBER_OF_CLUSTERS,
1,
1,
2,
1,
2,
1,
1,
Optional.of(new VeniceProperties(controllerProps)),
Optional.of(controllerProps),
Optional.of(new VeniceProperties(serverProperties)),
Expand All @@ -124,56 +120,54 @@ public void setUp() {
parentControllerClient = new ControllerClient(clusterName, parentControllerURLs);
dc0Client = new ControllerClient(clusterName, childDatacenters.get(0).getControllerConnectString());
dc1Client = new ControllerClient(clusterName, childDatacenters.get(1).getControllerConnectString());
dc2Client = new ControllerClient(clusterName, childDatacenters.get(2).getControllerConnectString());
dcControllerClientList = Arrays.asList(dc0Client, dc1Client, dc2Client);
dcControllerClientList = Arrays.asList(dc0Client, dc1Client);
}

@Test
public void testIncrementalPushIsEnabledForActiveActiveHybridUserStores() {
String storeName = TestUtils.getUniqueTopicString("test_store_");
createAndVerifyStoreInAllRegions(storeName, parentControllerClient, dcControllerClientList);
verifyDCConfigs(parentControllerClient, storeName, false, false, false);
verifyDCConfigs(dc0Client, storeName, false, false, false);
verifyDCConfigs(dc1Client, storeName, false, false, false);

assertCommand(
parentControllerClient.updateStore(
storeName,
new UpdateStoreQueryParams().setHybridRewindSeconds(10)
.setHybridOffsetLagThreshold(2)
.setHybridDataReplicationPolicy(DataReplicationPolicy.ACTIVE_ACTIVE)));

verifyDCConfigAARepl(parentControllerClient, storeName, true);
verifyDCConfigAARepl(dc0Client, storeName, true);
verifyDCConfigAARepl(dc1Client, storeName, true);
verifyDCConfigAARepl(dc2Client, storeName, true);
verifyDCConfigs(parentControllerClient, storeName, true, true, true);
verifyDCConfigs(dc0Client, storeName, true, true, true);
verifyDCConfigs(dc1Client, storeName, true, true, true);
}

public static void verifyDCConfigAARepl(ControllerClient controllerClient, String storeName, boolean expectedStatus) {
public static void verifyDCConfigs(
ControllerClient controllerClient,
String storeName,
boolean expectedAAStatus,
boolean expectedIncPushStatus,
boolean isNonNullHybridStoreConfig) {
waitForNonDeterministicAssertion(60, TimeUnit.SECONDS, () -> {
StoreResponse storeResponse = assertCommand(controllerClient.getStore(storeName));
StoreInfo storeInfo = storeResponse.getStore();
System.out.println(storeInfo);
assertEquals(
storeInfo.isActiveActiveReplicationEnabled(),
expectedStatus,
expectedAAStatus,
"The active active replication config does not match.");

assertEquals(storeInfo.isIncrementalPushEnabled(), expectedStatus, "The incremental push config does not match.");

assertEquals(
storeInfo.isIncrementalPushEnabled(),
expectedIncPushStatus,
"The incremental push config does not match.");
if (!isNonNullHybridStoreConfig) {
assertNull(storeInfo.getHybridStoreConfig(), "The hybrid store config is not null.");
return;
}
HybridStoreConfig hybridStoreConfig = storeInfo.getHybridStoreConfig();
assertNotNull(hybridStoreConfig);
assertNotNull(hybridStoreConfig, "The hybrid store config is null.");
DataReplicationPolicy policy = hybridStoreConfig.getDataReplicationPolicy();
assertNotNull(policy);
});
}

public static void createAndVerifyStoreInAllRegions(
String storeName,
ControllerClient parentControllerClient,
List<ControllerClient> controllerClientList) {
Assert
.assertFalse(parentControllerClient.createNewStore(storeName, "owner", STRING_SCHEMA, STRING_SCHEMA).isError());
TestUtils.waitForNonDeterministicAssertion(60, TimeUnit.SECONDS, () -> {
for (ControllerClient client: controllerClientList) {
Assert.assertFalse(client.getStore(storeName).isError());
}
assertNotNull(policy, "The data replication policy is null.");
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

import static com.linkedin.davinci.store.rocksdb.RocksDBServerConfig.ROCKSDB_PLAIN_TABLE_FORMAT_ENABLED;
import static com.linkedin.venice.ConfigKeys.CONTROLLER_AUTO_MATERIALIZE_DAVINCI_PUSH_STATUS_SYSTEM_STORE;
import static com.linkedin.venice.ConfigKeys.ENABLE_ACTIVE_ACTIVE_REPLICATION_AS_DEFAULT_FOR_HYBRID_STORE;
import static com.linkedin.venice.ConfigKeys.ENABLE_INCREMENTAL_PUSH_FOR_HYBRID_ACTIVE_ACTIVE_USER_STORES;
import static com.linkedin.venice.ConfigKeys.SERVER_DATABASE_CHECKSUM_VERIFICATION_ENABLED;
import static com.linkedin.venice.ConfigKeys.SERVER_DATABASE_SYNC_BYTES_INTERNAL_FOR_DEFERRED_WRITE_MODE;
import static com.linkedin.venice.ConfigKeys.SERVER_KAFKA_PRODUCER_POOL_SIZE_PER_KAFKA_CLUSTER;
Expand Down Expand Up @@ -74,6 +76,8 @@ public void setUp() {

Properties controllerProps = new Properties();
controllerProps.put(CONTROLLER_AUTO_MATERIALIZE_DAVINCI_PUSH_STATUS_SYSTEM_STORE, "true");
controllerProps.put(ENABLE_ACTIVE_ACTIVE_REPLICATION_AS_DEFAULT_FOR_HYBRID_STORE, true);
controllerProps.put(ENABLE_INCREMENTAL_PUSH_FOR_HYBRID_ACTIVE_ACTIVE_USER_STORES, true);

multiRegionMultiClusterWrapper = ServiceFactory.getVeniceTwoLayerMultiRegionMultiClusterWrapper(
NUMBER_OF_CHILD_DATACENTERS,
Expand Down Expand Up @@ -151,22 +155,15 @@ public void testAAReplicationForIncrementalPushToRT() throws Exception {
.setPartitionCount(1)
.setHybridOffsetLagThreshold(TEST_TIMEOUT / 2)
.setHybridRewindSeconds(2L)
.setIncrementalPushEnabled(true)
.setNativeReplicationEnabled(true)
.setNativeReplicationSourceFabric("dc-2");
TestUtils.assertCommand(parentControllerClient.updateStore(storeName, updateStoreParams));

UpdateStoreQueryParams enableAARepl = new UpdateStoreQueryParams().setActiveActiveReplicationEnabled(true);

// Print all the kafka cluster URLs
LOGGER.info("KafkaURL {}:{}", dcNames[0], childDatacenters.get(0).getKafkaBrokerWrapper().getAddress());
LOGGER.info("KafkaURL {}:{}", dcNames[1], childDatacenters.get(1).getKafkaBrokerWrapper().getAddress());
LOGGER.info("KafkaURL {}:{}", dcNames[2], childDatacenters.get(2).getKafkaBrokerWrapper().getAddress());
LOGGER.info("KafkaURL {}:{}", parentRegionName, veniceParentDefaultKafka.getAddress());

// Turn on A/A in parent to trigger auto replication metadata schema registration
TestWriteUtils.updateStore(storeName, parentControllerClient, enableAARepl);

// verify store configs
TestUtils.verifyDCConfigNativeAndActiveRepl(
storeName,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2334,9 +2334,6 @@ public void updateStore(String clusterName, String storeName, UpdateStoreQueryPa
setStore.pushStreamSourceAddress =
pushStreamSourceAddress.map(addToUpdatedConfigList(updatedConfigsList, PUSH_STREAM_SOURCE_ADDRESS))
.orElseGet(currStore::getPushStreamSourceAddress);
setStore.activeActiveReplicationEnabled = activeActiveReplicationEnabled
.map(addToUpdatedConfigList(updatedConfigsList, ACTIVE_ACTIVE_REPLICATION_ENABLED))
.orElseGet(currStore::isActiveActiveReplicationEnabled);

if (storeViewConfig.isPresent()) {
// Validate and merge store views if they're getting set
Expand Down Expand Up @@ -2411,16 +2408,30 @@ public void updateStore(String clusterName, String storeName, UpdateStoreQueryPa
hybridDataReplicationPolicy,
hybridBufferReplayPolicy);

// Get VeniceControllerClusterConfig for the cluster
VeniceControllerClusterConfig clusterConfig =
veniceHelixAdmin.getHelixVeniceClusterResources(clusterName).getConfig();
// Check if the store is being converted to a hybrid store
boolean storeBeingConvertedToHybrid =
!currStore.isHybrid() && hybridStoreConfigNew != null && veniceHelixAdmin.isHybrid(hybridStoreConfigNew);

// Update active-active replication and incremental push settings
setStore.activeActiveReplicationEnabled = activeActiveReplicationEnabled
.map(addToUpdatedConfigList(updatedConfigsList, ACTIVE_ACTIVE_REPLICATION_ENABLED))
.orElseGet(currStore::isActiveActiveReplicationEnabled);
// Enable active-active replication automatically when batch user store being converted to hybrid store and
// active-active replication is enabled for all hybrid store via the cluster config
if (storeBeingConvertedToHybrid && !setStore.activeActiveReplicationEnabled && !currStore.isSystemStore()
&& clusterConfig.isActiveActiveReplicationEnabledAsDefaultForHybrid()) {
setStore.activeActiveReplicationEnabled = true;
updatedConfigsList.add(ACTIVE_ACTIVE_REPLICATION_ENABLED);
}

setStore.incrementalPushEnabled =
incrementalPushEnabled.map(addToUpdatedConfigList(updatedConfigsList, INCREMENTAL_PUSH_ENABLED))
.orElseGet(currStore::isIncrementalPushEnabled);
// Enable incremental push automatically when batch user store being converted to hybrid store and active-active
// replication is enabled or being and the cluster config allows it.

if (!setStore.incrementalPushEnabled && !currStore.isSystemStore() && storeBeingConvertedToHybrid
&& setStore.activeActiveReplicationEnabled
&& clusterConfig.enabledIncrementalPushForHybridActiveActiveUserStores()) {
Expand Down

0 comments on commit ae3db5c

Please sign in to comment.