From 01ca67faf13e85a577eeb323500613439a123f4e Mon Sep 17 00:00:00 2001 From: Kristy Lee <18691669+kristyelee@users.noreply.github.com> Date: Tue, 24 Sep 2024 00:21:35 -0700 Subject: [PATCH] Standardize code --- .../kafka/consumer/StoreIngestionTask.java | 13 ------------ .../consumer/StoreIngestionTaskTest.java | 15 ------------- .../store/AbstractStorageEngineTest.java | 21 ------------------- .../rocksdb/RocksDBStorageEngineTest.java | 5 ----- .../linkedin/venice/server/VeniceServer.java | 13 ++++++++++-- 5 files changed, 11 insertions(+), 56 deletions(-) diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java index fd08ebb653..bf3e35fb22 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java @@ -586,19 +586,6 @@ void resubscribeForAllPartitions() throws InterruptedException { } } - /** - * Removes partitions that are not subscribed / not assigned. - */ - - public synchronized void removeUnsubscribedPartitions() { - throwIfNotRunning(); - for (PartitionConsumptionState partitionConsumptionState: partitionConsumptionStateMap.values()) { - if (!partitionConsumptionState.isSubscribed()) { - partitionConsumptionStateMap.remove(partitionConsumptionState.getPartition(), partitionConsumptionState); - } - } - } - /** * Adds an asynchronous partition subscription request for the task. */ diff --git a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTaskTest.java b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTaskTest.java index 9a78b34f06..a5b5969a80 100644 --- a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTaskTest.java +++ b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTaskTest.java @@ -2034,21 +2034,6 @@ public void testUnsubscribeConsumption(AAConfig aaConfig) throws Exception { }, aaConfig); } - @Test(dataProvider = "aaConfigProvider") - public void testRemoveUnsubscribedPartitions(AAConfig aaConfig) throws Exception { - localVeniceWriter.broadcastStartOfPush(new HashMap<>()); - localVeniceWriter.put(putKeyFoo, putValue, SCHEMA_ID); - - runTest(Utils.setOf(PARTITION_FOO), () -> { - verify(mockLogNotifier, timeout(TEST_TIMEOUT_MS)).started(topic, PARTITION_FOO); - // Start of push has already been consumed. Stop consumption. - storeIngestionTaskUnderTest.unSubscribePartition(fooTopicPartition); - // Unassigned partitions should be removed. - storeIngestionTaskUnderTest.removeUnsubscribedPartitions(); - verify(mockLogNotifier, timeout(TEST_TIMEOUT_MS)).stopped(anyString(), anyInt(), anyLong()); - }, aaConfig); - } - @Test(dataProvider = "aaConfigProvider") public void testKillConsumption(AAConfig aaConfig) throws Exception { final Thread writingThread = new Thread(() -> { diff --git a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/store/AbstractStorageEngineTest.java b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/store/AbstractStorageEngineTest.java index a19a4831a9..cbe8b0117b 100644 --- a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/store/AbstractStorageEngineTest.java +++ b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/store/AbstractStorageEngineTest.java @@ -126,27 +126,6 @@ public void testAddingAPartitionTwice() throws Exception { Assert.fail("Adding the same partition:" + partitionId + " again did not throw any exception as expected."); } - public void testRemovingPartition() throws Exception { - - init(); - - // first, add partition - doAddPartition(partitionId); - - if (!testStore.containsPartition(partitionId)) { - Assert.fail("Adding a new partition: " + partitionId + "failed!"); - } - - // remove existing partition - doRemovePartition(partitionId); - - Assert.assertEquals( - testStoreEngine.containsPartition(partitionId), - false, - "Failed to remove partition: " + partitionId + " from the store engine!"); - - } - public void testRemovingPartitionTwice() throws Exception { init(); diff --git a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/store/rocksdb/RocksDBStorageEngineTest.java b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/store/rocksdb/RocksDBStorageEngineTest.java index f2028d3a14..1af4b08fee 100644 --- a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/store/rocksdb/RocksDBStorageEngineTest.java +++ b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/store/rocksdb/RocksDBStorageEngineTest.java @@ -183,11 +183,6 @@ public void testAddingAPartitionTwice() throws Exception { super.testAddingAPartitionTwice(); } - @Test - public void testRemovingPartition() throws Exception { - super.testRemovingPartition(); - } - @Test public void testRemovingPartitionTwice() throws Exception { super.testRemovingPartitionTwice(); diff --git a/services/venice-server/src/main/java/com/linkedin/venice/server/VeniceServer.java b/services/venice-server/src/main/java/com/linkedin/venice/server/VeniceServer.java index 8012adffa3..da5d3e14f0 100644 --- a/services/venice-server/src/main/java/com/linkedin/venice/server/VeniceServer.java +++ b/services/venice-server/src/main/java/com/linkedin/venice/server/VeniceServer.java @@ -44,7 +44,11 @@ import com.linkedin.venice.listener.ServerReadMetadataRepository; import com.linkedin.venice.listener.ServerStoreAclHandler; import com.linkedin.venice.listener.StoreValueSchemasCacheService; -import com.linkedin.venice.meta.*; +import com.linkedin.venice.meta.ReadOnlyLiveClusterConfigRepository; +import com.linkedin.venice.meta.ReadOnlySchemaRepository; +import com.linkedin.venice.meta.ReadOnlyStoreRepository; +import com.linkedin.venice.meta.StaticClusterInfoProvider; +import com.linkedin.venice.meta.Version; import com.linkedin.venice.pubsub.PubSubClientsFactory; import com.linkedin.venice.schema.SchemaReader; import com.linkedin.venice.security.SSLFactory; @@ -63,7 +67,12 @@ import com.linkedin.venice.utils.Utils; import com.linkedin.venice.utils.lazy.Lazy; import io.tehuti.metrics.MetricsRepository; -import java.util.*; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Optional; +import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean;