From 9b765b4e286155ed9316209be9287b50623f5655 Mon Sep 17 00:00:00 2001 From: Jialin Liu Date: Thu, 5 Sep 2024 13:04:31 -0700 Subject: [PATCH] [vpj] Support AAWC TTL repush with compression (#1155) Current AAWC TTL repush need to deserialize value and perform a TTL based delete on the record to see if it needs to be filtered. The code does not handle the case when source version has compression enabled thus deserialization can fail. This PR fixes it by building the source version compressor and fix the deserialization issue. --- .../input/kafka/ttl/VeniceRmdTTLFilter.java | 35 +++++++++++++++++-- .../kafka/TestVeniceKafkaInputMapper.java | 7 ++++ .../TestVeniceChunkedPayloadTTLFilter.java | 8 ++++- .../ttl/TestVeniceKafkaInputTTLFilter.java | 7 ++++ .../venice/endToEnd/PartialUpdateTest.java | 6 ++-- 5 files changed, 57 insertions(+), 6 deletions(-) diff --git a/clients/venice-push-job/src/main/java/com/linkedin/venice/hadoop/input/kafka/ttl/VeniceRmdTTLFilter.java b/clients/venice-push-job/src/main/java/com/linkedin/venice/hadoop/input/kafka/ttl/VeniceRmdTTLFilter.java index ae6b630053..56ad71081b 100644 --- a/clients/venice-push-job/src/main/java/com/linkedin/venice/hadoop/input/kafka/ttl/VeniceRmdTTLFilter.java +++ b/clients/venice-push-job/src/main/java/com/linkedin/venice/hadoop/input/kafka/ttl/VeniceRmdTTLFilter.java @@ -1,5 +1,8 @@ package com.linkedin.venice.hadoop.input.kafka.ttl; +import static com.linkedin.venice.hadoop.VenicePushJobConstants.KAFKA_INPUT_BROKER_URL; +import static com.linkedin.venice.hadoop.VenicePushJobConstants.KAFKA_INPUT_SOURCE_COMPRESSION_STRATEGY; +import static com.linkedin.venice.hadoop.VenicePushJobConstants.KAFKA_INPUT_TOPIC; import static com.linkedin.venice.hadoop.VenicePushJobConstants.REPUSH_TTL_POLICY; import static com.linkedin.venice.hadoop.VenicePushJobConstants.REPUSH_TTL_START_TIMESTAMP; import static com.linkedin.venice.hadoop.VenicePushJobConstants.RMD_SCHEMA_DIR; @@ -10,7 +13,12 @@ import com.linkedin.davinci.schema.merge.MergeRecordHelper; import com.linkedin.davinci.schema.merge.UpdateResultStatus; import com.linkedin.davinci.serializer.avro.MapOrderPreservingSerDeFactory; +import com.linkedin.venice.compression.CompressionStrategy; +import com.linkedin.venice.compression.CompressorFactory; +import com.linkedin.venice.compression.VeniceCompressor; +import com.linkedin.venice.exceptions.VeniceException; import com.linkedin.venice.hadoop.AbstractVeniceFilter; +import com.linkedin.venice.hadoop.input.kafka.KafkaInputUtils; import com.linkedin.venice.hadoop.schema.HDFSSchemaSource; import com.linkedin.venice.schema.rmd.RmdTimestampType; import com.linkedin.venice.schema.rmd.RmdUtils; @@ -26,6 +34,8 @@ import java.util.Objects; import org.apache.avro.Schema; import org.apache.avro.generic.GenericRecord; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; /** @@ -34,6 +44,7 @@ * @param , the value contains schemaID, rmdId and rmdPayload that are required to retrieve RMD timestamp. */ public abstract class VeniceRmdTTLFilter extends AbstractVeniceFilter { + private static final Logger LOGGER = LogManager.getLogger(VeniceRmdTTLFilter.class); private final TTLResolutionPolicy ttlPolicy; private final long filterTimestamp; private final HDFSSchemaSource schemaSource; @@ -48,6 +59,7 @@ public abstract class VeniceRmdTTLFilter extends AbstractVeniceFilt private final Map> rmdSerializerCache; private final Map> valueSerializerCache; private final MergeRecordHelper mergeRecordHelper = new CollectionTimestampMergeRecordHelper(); + private final VeniceCompressor sourceVersionCompressor; public VeniceRmdTTLFilter(final VeniceProperties props) throws IOException { super(); @@ -62,6 +74,17 @@ public VeniceRmdTTLFilter(final VeniceProperties props) throws IOException { this.valueDeserializerCache = new VeniceConcurrentHashMap<>(); this.rmdSerializerCache = new VeniceConcurrentHashMap<>(); this.valueSerializerCache = new VeniceConcurrentHashMap<>(); + String sourceVersion = props.getString(KAFKA_INPUT_TOPIC); + String kafkaInputBrokerUrl = props.getString(KAFKA_INPUT_BROKER_URL); + CompressionStrategy compressionStrategy = + CompressionStrategy.valueOf(props.getString(KAFKA_INPUT_SOURCE_COMPRESSION_STRATEGY)); + this.sourceVersionCompressor = KafkaInputUtils + .getCompressor(new CompressorFactory(), compressionStrategy, kafkaInputBrokerUrl, sourceVersion, props); + LOGGER.info( + "Created RMD based TTL filter with source version: {}, broker url: {}, compression strategy: {}", + sourceVersion, + kafkaInputBrokerUrl, + compressionStrategy); } @Override @@ -108,7 +131,11 @@ boolean filterByTTLandMaybeUpdateValue(final INPUT_VALUE value) { } else { RecordDeserializer valueDeserializer = valueDeserializerCache.computeIfAbsent(valueSchemaId, this::generateValueDeserializer); - valueRecord = valueDeserializer.deserialize(getValuePayload(value)); + try { + valueRecord = valueDeserializer.deserialize(sourceVersionCompressor.decompress(getValuePayload(value))); + } catch (Exception e) { + throw new VeniceException("Unable to deserialize value payload", e); + } } UpdateResultStatus updateResultStatus = mergeRecordHelper.deleteRecord(valueRecord, (GenericRecord) rmdTimestampObject, filterTimestamp, 0); @@ -125,7 +152,11 @@ boolean filterByTTLandMaybeUpdateValue(final INPUT_VALUE value) { valueSerializerCache.computeIfAbsent(valueSchemaId, this::generateValueSerializer); RecordSerializer rmdSerializer = rmdSerializerCache.computeIfAbsent(rmdVersionId, this::generateRmdSerializer); - updateValuePayload(value, valueSerializer.serialize(valueRecord)); + try { + updateValuePayload(value, sourceVersionCompressor.compress(valueSerializer.serialize(valueRecord))); + } catch (Exception e) { + throw new VeniceException("Unable to update value payload", e); + } updateRmdPayload(value, ByteBuffer.wrap(rmdSerializer.serialize(rmdRecord))); return false; } diff --git a/clients/venice-push-job/src/test/java/com/linkedin/venice/hadoop/input/kafka/TestVeniceKafkaInputMapper.java b/clients/venice-push-job/src/test/java/com/linkedin/venice/hadoop/input/kafka/TestVeniceKafkaInputMapper.java index 2dd76e6960..04dff2d4b8 100644 --- a/clients/venice-push-job/src/test/java/com/linkedin/venice/hadoop/input/kafka/TestVeniceKafkaInputMapper.java +++ b/clients/venice-push-job/src/test/java/com/linkedin/venice/hadoop/input/kafka/TestVeniceKafkaInputMapper.java @@ -1,5 +1,8 @@ package com.linkedin.venice.hadoop.input.kafka; +import static com.linkedin.venice.hadoop.VenicePushJobConstants.KAFKA_INPUT_BROKER_URL; +import static com.linkedin.venice.hadoop.VenicePushJobConstants.KAFKA_INPUT_SOURCE_COMPRESSION_STRATEGY; +import static com.linkedin.venice.hadoop.VenicePushJobConstants.KAFKA_INPUT_TOPIC; import static com.linkedin.venice.hadoop.VenicePushJobConstants.REPUSH_TTL_ENABLE; import static com.linkedin.venice.hadoop.VenicePushJobConstants.REPUSH_TTL_POLICY; import static com.linkedin.venice.hadoop.VenicePushJobConstants.REPUSH_TTL_START_TIMESTAMP; @@ -12,6 +15,7 @@ import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; +import com.linkedin.venice.compression.CompressionStrategy; import com.linkedin.venice.hadoop.AbstractVeniceFilter; import com.linkedin.venice.hadoop.FilterChain; import com.linkedin.venice.hadoop.input.kafka.avro.KafkaInputMapperKey; @@ -81,6 +85,9 @@ public void testValidFilterWhenTTLSpecified() { props.put(REPUSH_TTL_POLICY, TTLResolutionPolicy.RT_WRITE_ONLY.getValue()); props.put(RMD_SCHEMA_DIR, "tmp"); props.put(VALUE_SCHEMA_DIR, "tmp2"); + props.put(KAFKA_INPUT_TOPIC, "test_v1"); + props.put(KAFKA_INPUT_BROKER_URL, "dummy"); + props.put(KAFKA_INPUT_SOURCE_COMPRESSION_STRATEGY, CompressionStrategy.NO_OP.toString()); props.put(REPUSH_TTL_START_TIMESTAMP, System.currentTimeMillis() - 10L * Time.MS_PER_SECOND); Assert.assertFalse(newMapper().getFilterChain(new VeniceProperties(props)).isEmpty()); diff --git a/clients/venice-push-job/src/test/java/com/linkedin/venice/hadoop/input/kafka/ttl/TestVeniceChunkedPayloadTTLFilter.java b/clients/venice-push-job/src/test/java/com/linkedin/venice/hadoop/input/kafka/ttl/TestVeniceChunkedPayloadTTLFilter.java index d59dca79ad..4f0360ebd9 100644 --- a/clients/venice-push-job/src/test/java/com/linkedin/venice/hadoop/input/kafka/ttl/TestVeniceChunkedPayloadTTLFilter.java +++ b/clients/venice-push-job/src/test/java/com/linkedin/venice/hadoop/input/kafka/ttl/TestVeniceChunkedPayloadTTLFilter.java @@ -1,5 +1,8 @@ package com.linkedin.venice.hadoop.input.kafka.ttl; +import static com.linkedin.venice.hadoop.VenicePushJobConstants.KAFKA_INPUT_BROKER_URL; +import static com.linkedin.venice.hadoop.VenicePushJobConstants.KAFKA_INPUT_SOURCE_COMPRESSION_STRATEGY; +import static com.linkedin.venice.hadoop.VenicePushJobConstants.KAFKA_INPUT_TOPIC; import static com.linkedin.venice.hadoop.VenicePushJobConstants.REPUSH_TTL_ENABLE; import static com.linkedin.venice.hadoop.VenicePushJobConstants.REPUSH_TTL_POLICY; import static com.linkedin.venice.hadoop.VenicePushJobConstants.REPUSH_TTL_START_TIMESTAMP; @@ -17,6 +20,7 @@ import com.linkedin.davinci.serializer.avro.MapOrderPreservingSerDeFactory; import com.linkedin.davinci.serializer.avro.MapOrderPreservingSerializer; +import com.linkedin.venice.compression.CompressionStrategy; import com.linkedin.venice.controllerapi.ControllerClient; import com.linkedin.venice.controllerapi.MultiSchemaResponse; import com.linkedin.venice.hadoop.input.kafka.avro.KafkaInputMapperValue; @@ -71,10 +75,12 @@ public void setUp() throws IOException { validProps.put(RMD_SCHEMA_DIR, getTempDataDirectory().getAbsolutePath()); validProps.put(VALUE_SCHEMA_DIR, getTempDataDirectory().getAbsolutePath()); validProps.put(VENICE_STORE_NAME_PROP, TEST_STORE); + validProps.put(KAFKA_INPUT_SOURCE_COMPRESSION_STRATEGY, CompressionStrategy.NO_OP.toString()); + validProps.put(KAFKA_INPUT_BROKER_URL, "dummy"); + validProps.put(KAFKA_INPUT_TOPIC, TEST_STORE + "_v1"); VeniceProperties valid = new VeniceProperties(validProps); // set up HDFS schema source to write dummy RMD schemas on temp directory setupHDFS(valid); - this.filter = new VeniceChunkedPayloadTTLFilter(valid); } diff --git a/clients/venice-push-job/src/test/java/com/linkedin/venice/hadoop/input/kafka/ttl/TestVeniceKafkaInputTTLFilter.java b/clients/venice-push-job/src/test/java/com/linkedin/venice/hadoop/input/kafka/ttl/TestVeniceKafkaInputTTLFilter.java index 486556a168..0e6613c27f 100644 --- a/clients/venice-push-job/src/test/java/com/linkedin/venice/hadoop/input/kafka/ttl/TestVeniceKafkaInputTTLFilter.java +++ b/clients/venice-push-job/src/test/java/com/linkedin/venice/hadoop/input/kafka/ttl/TestVeniceKafkaInputTTLFilter.java @@ -1,5 +1,8 @@ package com.linkedin.venice.hadoop.input.kafka.ttl; +import static com.linkedin.venice.hadoop.VenicePushJobConstants.KAFKA_INPUT_BROKER_URL; +import static com.linkedin.venice.hadoop.VenicePushJobConstants.KAFKA_INPUT_SOURCE_COMPRESSION_STRATEGY; +import static com.linkedin.venice.hadoop.VenicePushJobConstants.KAFKA_INPUT_TOPIC; import static com.linkedin.venice.hadoop.VenicePushJobConstants.REPUSH_TTL_ENABLE; import static com.linkedin.venice.hadoop.VenicePushJobConstants.REPUSH_TTL_POLICY; import static com.linkedin.venice.hadoop.VenicePushJobConstants.REPUSH_TTL_START_TIMESTAMP; @@ -12,6 +15,7 @@ import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; +import com.linkedin.venice.compression.CompressionStrategy; import com.linkedin.venice.controllerapi.ControllerClient; import com.linkedin.venice.controllerapi.MultiSchemaResponse; import com.linkedin.venice.hadoop.FilterChain; @@ -59,6 +63,9 @@ public void setUp() throws IOException { validProps.put(RMD_SCHEMA_DIR, getTempDataDirectory().getAbsolutePath()); validProps.put(VALUE_SCHEMA_DIR, getTempDataDirectory().getAbsolutePath()); validProps.put(VENICE_STORE_NAME_PROP, TEST_STORE); + validProps.put(KAFKA_INPUT_SOURCE_COMPRESSION_STRATEGY, CompressionStrategy.NO_OP.toString()); + validProps.put(KAFKA_INPUT_BROKER_URL, "dummy"); + validProps.put(KAFKA_INPUT_TOPIC, TEST_STORE + "_v1"); VeniceProperties valid = new VeniceProperties(validProps); // set up HDFS schema source to write dummy RMD schemas on temp directory setupHDFS(valid); diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/PartialUpdateTest.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/PartialUpdateTest.java index 71e84ae629..643e2046ce 100644 --- a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/PartialUpdateTest.java +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/PartialUpdateTest.java @@ -944,8 +944,8 @@ public void testActiveActivePartialUpdateWithCompression(CompressionStrategy com MetricsUtils.validateMetricRange(assembledRmdSizes, 290000, 740000); } - @Test(timeOut = TEST_TIMEOUT_MS) - public void testRepushWithTTLWithActiveActivePartialUpdateStore() { + @Test(timeOut = TEST_TIMEOUT_MS, dataProvider = "Compression-Strategies", dataProviderClass = DataProviderUtils.class) + public void testRepushWithTTLWithActiveActivePartialUpdateStore(CompressionStrategy compressionStrategy) { final String storeName = Utils.getUniqueString("ttlRepushAAWC"); String parentControllerUrl = parentController.getControllerUrl(); Schema valueSchema = AvroCompatibilityHelper.parse(loadFileAsString("CollectionRecordV1.avsc")); @@ -967,7 +967,7 @@ public void testRepushWithTTLWithActiveActivePartialUpdateStore() { UpdateStoreQueryParams updateStoreParams = new UpdateStoreQueryParams().setStorageQuotaInByte(Store.UNLIMITED_STORAGE_QUOTA) .setPartitionCount(1) - .setCompressionStrategy(CompressionStrategy.NO_OP) + .setCompressionStrategy(compressionStrategy) .setWriteComputationEnabled(true) .setActiveActiveReplicationEnabled(true) .setChunkingEnabled(true)