diff --git a/clients/venice-push-job/src/main/java/com/linkedin/venice/hadoop/input/kafka/ttl/VeniceRmdTTLFilter.java b/clients/venice-push-job/src/main/java/com/linkedin/venice/hadoop/input/kafka/ttl/VeniceRmdTTLFilter.java index ae6b630053..56ad71081b 100644 --- a/clients/venice-push-job/src/main/java/com/linkedin/venice/hadoop/input/kafka/ttl/VeniceRmdTTLFilter.java +++ b/clients/venice-push-job/src/main/java/com/linkedin/venice/hadoop/input/kafka/ttl/VeniceRmdTTLFilter.java @@ -1,5 +1,8 @@ package com.linkedin.venice.hadoop.input.kafka.ttl; +import static com.linkedin.venice.hadoop.VenicePushJobConstants.KAFKA_INPUT_BROKER_URL; +import static com.linkedin.venice.hadoop.VenicePushJobConstants.KAFKA_INPUT_SOURCE_COMPRESSION_STRATEGY; +import static com.linkedin.venice.hadoop.VenicePushJobConstants.KAFKA_INPUT_TOPIC; import static com.linkedin.venice.hadoop.VenicePushJobConstants.REPUSH_TTL_POLICY; import static com.linkedin.venice.hadoop.VenicePushJobConstants.REPUSH_TTL_START_TIMESTAMP; import static com.linkedin.venice.hadoop.VenicePushJobConstants.RMD_SCHEMA_DIR; @@ -10,7 +13,12 @@ import com.linkedin.davinci.schema.merge.MergeRecordHelper; import com.linkedin.davinci.schema.merge.UpdateResultStatus; import com.linkedin.davinci.serializer.avro.MapOrderPreservingSerDeFactory; +import com.linkedin.venice.compression.CompressionStrategy; +import com.linkedin.venice.compression.CompressorFactory; +import com.linkedin.venice.compression.VeniceCompressor; +import com.linkedin.venice.exceptions.VeniceException; import com.linkedin.venice.hadoop.AbstractVeniceFilter; +import com.linkedin.venice.hadoop.input.kafka.KafkaInputUtils; import com.linkedin.venice.hadoop.schema.HDFSSchemaSource; import com.linkedin.venice.schema.rmd.RmdTimestampType; import com.linkedin.venice.schema.rmd.RmdUtils; @@ -26,6 +34,8 @@ import java.util.Objects; import org.apache.avro.Schema; import org.apache.avro.generic.GenericRecord; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; /** @@ -34,6 +44,7 @@ * @param , the value contains schemaID, rmdId and rmdPayload that are required to retrieve RMD timestamp. */ public abstract class VeniceRmdTTLFilter extends AbstractVeniceFilter { + private static final Logger LOGGER = LogManager.getLogger(VeniceRmdTTLFilter.class); private final TTLResolutionPolicy ttlPolicy; private final long filterTimestamp; private final HDFSSchemaSource schemaSource; @@ -48,6 +59,7 @@ public abstract class VeniceRmdTTLFilter extends AbstractVeniceFilt private final Map> rmdSerializerCache; private final Map> valueSerializerCache; private final MergeRecordHelper mergeRecordHelper = new CollectionTimestampMergeRecordHelper(); + private final VeniceCompressor sourceVersionCompressor; public VeniceRmdTTLFilter(final VeniceProperties props) throws IOException { super(); @@ -62,6 +74,17 @@ public VeniceRmdTTLFilter(final VeniceProperties props) throws IOException { this.valueDeserializerCache = new VeniceConcurrentHashMap<>(); this.rmdSerializerCache = new VeniceConcurrentHashMap<>(); this.valueSerializerCache = new VeniceConcurrentHashMap<>(); + String sourceVersion = props.getString(KAFKA_INPUT_TOPIC); + String kafkaInputBrokerUrl = props.getString(KAFKA_INPUT_BROKER_URL); + CompressionStrategy compressionStrategy = + CompressionStrategy.valueOf(props.getString(KAFKA_INPUT_SOURCE_COMPRESSION_STRATEGY)); + this.sourceVersionCompressor = KafkaInputUtils + .getCompressor(new CompressorFactory(), compressionStrategy, kafkaInputBrokerUrl, sourceVersion, props); + LOGGER.info( + "Created RMD based TTL filter with source version: {}, broker url: {}, compression strategy: {}", + sourceVersion, + kafkaInputBrokerUrl, + compressionStrategy); } @Override @@ -108,7 +131,11 @@ boolean filterByTTLandMaybeUpdateValue(final INPUT_VALUE value) { } else { RecordDeserializer valueDeserializer = valueDeserializerCache.computeIfAbsent(valueSchemaId, this::generateValueDeserializer); - valueRecord = valueDeserializer.deserialize(getValuePayload(value)); + try { + valueRecord = valueDeserializer.deserialize(sourceVersionCompressor.decompress(getValuePayload(value))); + } catch (Exception e) { + throw new VeniceException("Unable to deserialize value payload", e); + } } UpdateResultStatus updateResultStatus = mergeRecordHelper.deleteRecord(valueRecord, (GenericRecord) rmdTimestampObject, filterTimestamp, 0); @@ -125,7 +152,11 @@ boolean filterByTTLandMaybeUpdateValue(final INPUT_VALUE value) { valueSerializerCache.computeIfAbsent(valueSchemaId, this::generateValueSerializer); RecordSerializer rmdSerializer = rmdSerializerCache.computeIfAbsent(rmdVersionId, this::generateRmdSerializer); - updateValuePayload(value, valueSerializer.serialize(valueRecord)); + try { + updateValuePayload(value, sourceVersionCompressor.compress(valueSerializer.serialize(valueRecord))); + } catch (Exception e) { + throw new VeniceException("Unable to update value payload", e); + } updateRmdPayload(value, ByteBuffer.wrap(rmdSerializer.serialize(rmdRecord))); return false; } diff --git a/clients/venice-push-job/src/test/java/com/linkedin/venice/hadoop/input/kafka/TestVeniceKafkaInputMapper.java b/clients/venice-push-job/src/test/java/com/linkedin/venice/hadoop/input/kafka/TestVeniceKafkaInputMapper.java index 2dd76e6960..04dff2d4b8 100644 --- a/clients/venice-push-job/src/test/java/com/linkedin/venice/hadoop/input/kafka/TestVeniceKafkaInputMapper.java +++ b/clients/venice-push-job/src/test/java/com/linkedin/venice/hadoop/input/kafka/TestVeniceKafkaInputMapper.java @@ -1,5 +1,8 @@ package com.linkedin.venice.hadoop.input.kafka; +import static com.linkedin.venice.hadoop.VenicePushJobConstants.KAFKA_INPUT_BROKER_URL; +import static com.linkedin.venice.hadoop.VenicePushJobConstants.KAFKA_INPUT_SOURCE_COMPRESSION_STRATEGY; +import static com.linkedin.venice.hadoop.VenicePushJobConstants.KAFKA_INPUT_TOPIC; import static com.linkedin.venice.hadoop.VenicePushJobConstants.REPUSH_TTL_ENABLE; import static com.linkedin.venice.hadoop.VenicePushJobConstants.REPUSH_TTL_POLICY; import static com.linkedin.venice.hadoop.VenicePushJobConstants.REPUSH_TTL_START_TIMESTAMP; @@ -12,6 +15,7 @@ import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; +import com.linkedin.venice.compression.CompressionStrategy; import com.linkedin.venice.hadoop.AbstractVeniceFilter; import com.linkedin.venice.hadoop.FilterChain; import com.linkedin.venice.hadoop.input.kafka.avro.KafkaInputMapperKey; @@ -81,6 +85,9 @@ public void testValidFilterWhenTTLSpecified() { props.put(REPUSH_TTL_POLICY, TTLResolutionPolicy.RT_WRITE_ONLY.getValue()); props.put(RMD_SCHEMA_DIR, "tmp"); props.put(VALUE_SCHEMA_DIR, "tmp2"); + props.put(KAFKA_INPUT_TOPIC, "test_v1"); + props.put(KAFKA_INPUT_BROKER_URL, "dummy"); + props.put(KAFKA_INPUT_SOURCE_COMPRESSION_STRATEGY, CompressionStrategy.NO_OP.toString()); props.put(REPUSH_TTL_START_TIMESTAMP, System.currentTimeMillis() - 10L * Time.MS_PER_SECOND); Assert.assertFalse(newMapper().getFilterChain(new VeniceProperties(props)).isEmpty()); diff --git a/clients/venice-push-job/src/test/java/com/linkedin/venice/hadoop/input/kafka/ttl/TestVeniceChunkedPayloadTTLFilter.java b/clients/venice-push-job/src/test/java/com/linkedin/venice/hadoop/input/kafka/ttl/TestVeniceChunkedPayloadTTLFilter.java index d59dca79ad..4f0360ebd9 100644 --- a/clients/venice-push-job/src/test/java/com/linkedin/venice/hadoop/input/kafka/ttl/TestVeniceChunkedPayloadTTLFilter.java +++ b/clients/venice-push-job/src/test/java/com/linkedin/venice/hadoop/input/kafka/ttl/TestVeniceChunkedPayloadTTLFilter.java @@ -1,5 +1,8 @@ package com.linkedin.venice.hadoop.input.kafka.ttl; +import static com.linkedin.venice.hadoop.VenicePushJobConstants.KAFKA_INPUT_BROKER_URL; +import static com.linkedin.venice.hadoop.VenicePushJobConstants.KAFKA_INPUT_SOURCE_COMPRESSION_STRATEGY; +import static com.linkedin.venice.hadoop.VenicePushJobConstants.KAFKA_INPUT_TOPIC; import static com.linkedin.venice.hadoop.VenicePushJobConstants.REPUSH_TTL_ENABLE; import static com.linkedin.venice.hadoop.VenicePushJobConstants.REPUSH_TTL_POLICY; import static com.linkedin.venice.hadoop.VenicePushJobConstants.REPUSH_TTL_START_TIMESTAMP; @@ -17,6 +20,7 @@ import com.linkedin.davinci.serializer.avro.MapOrderPreservingSerDeFactory; import com.linkedin.davinci.serializer.avro.MapOrderPreservingSerializer; +import com.linkedin.venice.compression.CompressionStrategy; import com.linkedin.venice.controllerapi.ControllerClient; import com.linkedin.venice.controllerapi.MultiSchemaResponse; import com.linkedin.venice.hadoop.input.kafka.avro.KafkaInputMapperValue; @@ -71,10 +75,12 @@ public void setUp() throws IOException { validProps.put(RMD_SCHEMA_DIR, getTempDataDirectory().getAbsolutePath()); validProps.put(VALUE_SCHEMA_DIR, getTempDataDirectory().getAbsolutePath()); validProps.put(VENICE_STORE_NAME_PROP, TEST_STORE); + validProps.put(KAFKA_INPUT_SOURCE_COMPRESSION_STRATEGY, CompressionStrategy.NO_OP.toString()); + validProps.put(KAFKA_INPUT_BROKER_URL, "dummy"); + validProps.put(KAFKA_INPUT_TOPIC, TEST_STORE + "_v1"); VeniceProperties valid = new VeniceProperties(validProps); // set up HDFS schema source to write dummy RMD schemas on temp directory setupHDFS(valid); - this.filter = new VeniceChunkedPayloadTTLFilter(valid); } diff --git a/clients/venice-push-job/src/test/java/com/linkedin/venice/hadoop/input/kafka/ttl/TestVeniceKafkaInputTTLFilter.java b/clients/venice-push-job/src/test/java/com/linkedin/venice/hadoop/input/kafka/ttl/TestVeniceKafkaInputTTLFilter.java index 486556a168..0e6613c27f 100644 --- a/clients/venice-push-job/src/test/java/com/linkedin/venice/hadoop/input/kafka/ttl/TestVeniceKafkaInputTTLFilter.java +++ b/clients/venice-push-job/src/test/java/com/linkedin/venice/hadoop/input/kafka/ttl/TestVeniceKafkaInputTTLFilter.java @@ -1,5 +1,8 @@ package com.linkedin.venice.hadoop.input.kafka.ttl; +import static com.linkedin.venice.hadoop.VenicePushJobConstants.KAFKA_INPUT_BROKER_URL; +import static com.linkedin.venice.hadoop.VenicePushJobConstants.KAFKA_INPUT_SOURCE_COMPRESSION_STRATEGY; +import static com.linkedin.venice.hadoop.VenicePushJobConstants.KAFKA_INPUT_TOPIC; import static com.linkedin.venice.hadoop.VenicePushJobConstants.REPUSH_TTL_ENABLE; import static com.linkedin.venice.hadoop.VenicePushJobConstants.REPUSH_TTL_POLICY; import static com.linkedin.venice.hadoop.VenicePushJobConstants.REPUSH_TTL_START_TIMESTAMP; @@ -12,6 +15,7 @@ import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; +import com.linkedin.venice.compression.CompressionStrategy; import com.linkedin.venice.controllerapi.ControllerClient; import com.linkedin.venice.controllerapi.MultiSchemaResponse; import com.linkedin.venice.hadoop.FilterChain; @@ -59,6 +63,9 @@ public void setUp() throws IOException { validProps.put(RMD_SCHEMA_DIR, getTempDataDirectory().getAbsolutePath()); validProps.put(VALUE_SCHEMA_DIR, getTempDataDirectory().getAbsolutePath()); validProps.put(VENICE_STORE_NAME_PROP, TEST_STORE); + validProps.put(KAFKA_INPUT_SOURCE_COMPRESSION_STRATEGY, CompressionStrategy.NO_OP.toString()); + validProps.put(KAFKA_INPUT_BROKER_URL, "dummy"); + validProps.put(KAFKA_INPUT_TOPIC, TEST_STORE + "_v1"); VeniceProperties valid = new VeniceProperties(validProps); // set up HDFS schema source to write dummy RMD schemas on temp directory setupHDFS(valid); diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/PartialUpdateTest.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/PartialUpdateTest.java index 71e84ae629..643e2046ce 100644 --- a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/PartialUpdateTest.java +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/PartialUpdateTest.java @@ -944,8 +944,8 @@ public void testActiveActivePartialUpdateWithCompression(CompressionStrategy com MetricsUtils.validateMetricRange(assembledRmdSizes, 290000, 740000); } - @Test(timeOut = TEST_TIMEOUT_MS) - public void testRepushWithTTLWithActiveActivePartialUpdateStore() { + @Test(timeOut = TEST_TIMEOUT_MS, dataProvider = "Compression-Strategies", dataProviderClass = DataProviderUtils.class) + public void testRepushWithTTLWithActiveActivePartialUpdateStore(CompressionStrategy compressionStrategy) { final String storeName = Utils.getUniqueString("ttlRepushAAWC"); String parentControllerUrl = parentController.getControllerUrl(); Schema valueSchema = AvroCompatibilityHelper.parse(loadFileAsString("CollectionRecordV1.avsc")); @@ -967,7 +967,7 @@ public void testRepushWithTTLWithActiveActivePartialUpdateStore() { UpdateStoreQueryParams updateStoreParams = new UpdateStoreQueryParams().setStorageQuotaInByte(Store.UNLIMITED_STORAGE_QUOTA) .setPartitionCount(1) - .setCompressionStrategy(CompressionStrategy.NO_OP) + .setCompressionStrategy(compressionStrategy) .setWriteComputationEnabled(true) .setActiveActiveReplicationEnabled(true) .setChunkingEnabled(true)