diff --git a/build.gradle b/build.gradle index e540694e6f..9c02cfde4a 100644 --- a/build.gradle +++ b/build.gradle @@ -269,6 +269,7 @@ subprojects { implementation libraries.grpcProtobuf implementation libraries.grpcServices implementation libraries.grpcStub + implementation 'org.ow2.asm:asm:9.7' compileOnly libraries.tomcatAnnotations } diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/DaVinciBackend.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/DaVinciBackend.java index beb84ca31c..c219e375db 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/DaVinciBackend.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/DaVinciBackend.java @@ -9,7 +9,7 @@ import com.linkedin.davinci.blobtransfer.BlobTransferManager; import com.linkedin.davinci.blobtransfer.BlobTransferUtil; -import com.linkedin.davinci.client.DaVinciRecordTransformer; +import com.linkedin.davinci.client.DaVinciRecordTransformerFunctionalInterface; import com.linkedin.davinci.compression.StorageEngineBackedCompressorFactory; import com.linkedin.davinci.config.StoreBackendConfig; import com.linkedin.davinci.config.VeniceConfigLoader; @@ -119,7 +119,7 @@ public DaVinciBackend( Optional> managedClients, ICProvider icProvider, Optional cacheConfig, - Function getRecordTransformer) { + DaVinciRecordTransformerFunctionalInterface recordTransformerFunction) { LOGGER.info("Creating Da Vinci backend with managed clients: {}", managedClients); try { VeniceServerConfig backendConfig = configLoader.getVeniceServerConfig(); @@ -268,7 +268,7 @@ public DaVinciBackend( false, compressorFactory, cacheBackend, - getRecordTransformer, + recordTransformerFunction, true, // TODO: consider how/if a repair task would be valid for Davinci users? null, @@ -290,6 +290,10 @@ public DaVinciBackend( } if (backendConfig.isBlobTransferManagerEnabled()) { + if (recordTransformerFunction != null) { + throw new VeniceException("DaVinciRecordTransformer doesn't support blob transfer."); + } + blobTransferManager = BlobTransferUtil.getP2PBlobTransferManagerForDVCAndStart( configLoader.getVeniceServerConfig().getDvcP2pBlobTransferServerPort(), configLoader.getVeniceServerConfig().getDvcP2pBlobTransferClientPort(), @@ -462,7 +466,8 @@ private synchronized void bootstrap() { List partitions = storeNameToPartitionListMap.get(storeName); String versionTopic = version.kafkaTopicName(); LOGGER.info("Bootstrapping partitions {} for {}", partitions, versionTopic); - aggVersionedStorageEngineStats.setStorageEngine(versionTopic, storageService.getStorageEngine(versionTopic)); + AbstractStorageEngine storageEngine = storageService.getStorageEngine(versionTopic); + aggVersionedStorageEngineStats.setStorageEngine(versionTopic, storageEngine); StoreBackend storeBackend = getStoreOrThrow(storeName); storeBackend.subscribe(ComplementSet.newSet(partitions), Optional.of(version)); }); diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/client/AvroGenericDaVinciClient.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/client/AvroGenericDaVinciClient.java index 6b2999d85f..34b1edbae8 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/client/AvroGenericDaVinciClient.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/client/AvroGenericDaVinciClient.java @@ -78,7 +78,6 @@ import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Consumer; -import java.util.function.Function; import org.apache.avro.Schema; import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericRecord; @@ -138,6 +137,8 @@ protected boolean removeEldestEntry(Map.Entry eldest) { private final AbstractAvroChunkingAdapter chunkingAdapter; private final Executor readChunkExecutorForLargeRequest; + private final DaVinciRecordTransformerConfig recordTransformerConfig; + public AvroGenericDaVinciClient( DaVinciConfig daVinciConfig, ClientConfig clientConfig, @@ -180,8 +181,17 @@ protected AvroGenericDaVinciClient( this.managedClients = managedClients; this.icProvider = icProvider; this.chunkingAdapter = chunkingAdapter; + this.recordTransformerConfig = daVinciConfig.getRecordTransformerConfig(); this.readChunkExecutorForLargeRequest = readChunkExecutorForLargeRequest != null ? readChunkExecutorForLargeRequest : READ_CHUNK_EXECUTOR; + + if (daVinciConfig.isIsolated() && recordTransformerConfig != null) { + // When both are enabled, this causes the storage engine to be deleted everytime the client starts, + // since the record transformer config is never persisted to disk. Additionally, this will spawn multiple + // transformers per version, and if the user's transformer is stateful this could cause issues. + throw new VeniceClientException("Ingestion Isolation is not supported with DaVinciRecordTransformer"); + } + preValidation.run(); } @@ -657,7 +667,8 @@ protected GenericRecordChunkingAdapter getGenericRecordChunkingAdapter() { return GenericRecordChunkingAdapter.INSTANCE; } - private D2ServiceDiscoveryResponse discoverService() { + // Visible for testing + protected D2ServiceDiscoveryResponse discoverService() { try (TransportClient client = getTransportClient(clientConfig)) { if (!(client instanceof D2TransportClient)) { throw new VeniceClientException( @@ -688,6 +699,7 @@ private VeniceConfigLoader buildVeniceConfig() { if (kafkaBootstrapServers == null) { kafkaBootstrapServers = backendConfig.getString(KAFKA_BOOTSTRAP_SERVERS); } + VeniceProperties config = new PropertyBuilder().put(KAFKA_ADMIN_CLASS, ApacheKafkaAdminAdapter.class.getName()) .put(ROCKSDB_LEVEL0_FILE_NUM_COMPACTION_TRIGGER, 4) // RocksDB default config .put(ROCKSDB_LEVEL0_SLOWDOWN_WRITES_TRIGGER, 20) // RocksDB default config @@ -703,8 +715,7 @@ private VeniceConfigLoader buildVeniceConfig() { .put( RECORD_TRANSFORMER_VALUE_SCHEMA, daVinciConfig.isRecordTransformerEnabled() - // We're creating a new record transformer here just to get the schema - ? daVinciConfig.getRecordTransformer(0).getValueOutputSchema().toString() + ? recordTransformerConfig.getOutputValueSchema().toString() : "null") .put(INGESTION_ISOLATION_CONFIG_PREFIX + "." + INGESTION_MEMORY_LIMIT, -1) // Explicitly disable memory limiter // in Isolated Process @@ -714,13 +725,14 @@ private VeniceConfigLoader buildVeniceConfig() { return new VeniceConfigLoader(config, config); } - private void initBackend( + // Visible for testing + protected void initBackend( ClientConfig clientConfig, VeniceConfigLoader configLoader, Optional> managedClients, ICProvider icProvider, Optional cacheConfig, - Function getRecordTransformer) { + DaVinciRecordTransformerFunctionalInterface recordTransformerFunction) { synchronized (AvroGenericDaVinciClient.class) { if (daVinciBackend == null) { logger @@ -732,7 +744,7 @@ private void initBackend( managedClients, icProvider, cacheConfig, - getRecordTransformer), + recordTransformerFunction), backend -> { // Ensure that existing backend is fully closed before a new one can be created. synchronized (AvroGenericDaVinciClient.class) { @@ -776,7 +788,7 @@ public synchronized void start() { managedClients, icProvider, cacheConfig, - daVinciConfig::getRecordTransformer); + daVinciConfig.getRecordTransformerFunction()); try { getBackend().verifyCacheConfigEquality(daVinciConfig.getCacheConfig(), getStoreName()); @@ -795,12 +807,28 @@ public synchronized void start() { this.keyDeserializer = FastSerializerDeserializerFactory.getFastAvroGenericDeserializer(keySchema, keySchema); this.genericRecordStoreDeserializerCache = new AvroStoreDeserializerCache(daVinciBackend.get().getSchemaRepository(), getStoreName(), true); - this.storeDeserializerCache = clientConfig.isSpecificClient() - ? new AvroSpecificStoreDeserializerCache<>( + + if (clientConfig.isSpecificClient()) { + if (daVinciConfig.isRecordTransformerEnabled()) { + if (recordTransformerConfig.getOutputValueClass() != clientConfig.getSpecificValueClass()) { + throw new VeniceClientException( + "Specific value class mismatch between ClientConfig and DaVinciRecordTransformer, expected=" + + clientConfig.getSpecificValueClass() + ", actual=" + + recordTransformerConfig.getOutputValueClass()); + } + + this.storeDeserializerCache = new AvroSpecificStoreDeserializerCache<>( + recordTransformerConfig.getOutputValueSchema(), + clientConfig.getSpecificValueClass()); + } else { + this.storeDeserializerCache = new AvroSpecificStoreDeserializerCache<>( daVinciBackend.get().getSchemaRepository(), getStoreName(), - clientConfig.getSpecificValueClass()) - : (AvroStoreDeserializerCache) this.genericRecordStoreDeserializerCache; + clientConfig.getSpecificValueClass()); + } + } else { + this.storeDeserializerCache = (AvroStoreDeserializerCache) this.genericRecordStoreDeserializerCache; + } ready.set(true); logger.info("Client is started successfully, storeName=" + getStoreName()); diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/client/BlockingDaVinciRecordTransformer.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/client/BlockingDaVinciRecordTransformer.java index 913b4aa475..9f4a2d92db 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/client/BlockingDaVinciRecordTransformer.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/client/BlockingDaVinciRecordTransformer.java @@ -19,41 +19,44 @@ public class BlockingDaVinciRecordTransformer extends DaVinciRecordTran private final DaVinciRecordTransformer recordTransformer; private final CountDownLatch startLatch = new CountDownLatch(1); - public BlockingDaVinciRecordTransformer(DaVinciRecordTransformer recordTransformer) { - super(recordTransformer.getStoreVersion()); + public BlockingDaVinciRecordTransformer(DaVinciRecordTransformer recordTransformer, boolean storeRecordsInDaVinci) { + super(recordTransformer.getStoreVersion(), storeRecordsInDaVinci); this.recordTransformer = recordTransformer; } - public Schema getKeyOutputSchema() { - return this.recordTransformer.getKeyOutputSchema(); + public Schema getKeySchema() { + return this.recordTransformer.getKeySchema(); } - public Schema getValueOutputSchema() { - return this.recordTransformer.getValueOutputSchema(); + public Schema getOutputValueSchema() { + return this.recordTransformer.getOutputValueSchema(); } - public O put(Lazy key, Lazy value) { + public DaVinciRecordTransformerResult transform(Lazy key, Lazy value) { + return this.recordTransformer.transform(key, value); + } + + public void processPut(Lazy key, Lazy value) { try { // Waiting for onStartIngestionTask to complete before proceeding startLatch.await(); - return (O) this.recordTransformer.put(key, value); + this.recordTransformer.processPut(key, value); } catch (InterruptedException e) { // Restore the interrupt status Thread.currentThread().interrupt(); - return null; } } - public O delete(Lazy key) { - return (O) this.recordTransformer.delete(key); + public void processDelete(Lazy key) { + this.recordTransformer.processDelete(key); } - public void onStartIngestionTask() { - this.recordTransformer.onStartIngestionTask(); + public void onStartVersionIngestion() { + this.recordTransformer.onStartVersionIngestion(); startLatch.countDown(); } - public void onEndIngestionTask() { - this.recordTransformer.onEndIngestionTask(); + public void onEndVersionIngestion() { + this.recordTransformer.onEndVersionIngestion(); } } diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/client/DaVinciConfig.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/client/DaVinciConfig.java index a0fe254f8a..a51e535e66 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/client/DaVinciConfig.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/client/DaVinciConfig.java @@ -1,7 +1,6 @@ package com.linkedin.davinci.client; import com.linkedin.davinci.store.cache.backend.ObjectCacheConfig; -import java.util.function.Function; public class DaVinciConfig { @@ -32,7 +31,7 @@ public class DaVinciConfig { /** * Record transformer reference */ - private Function recordTransformerFunction; + private DaVinciRecordTransformerConfig recordTransformerConfig; /** * Whether to enable read-path metrics. @@ -107,7 +106,7 @@ public boolean isCacheEnabled() { } public boolean isRecordTransformerEnabled() { - return recordTransformerFunction != null; + return recordTransformerConfig != null; } public ObjectCacheConfig getCacheConfig() { @@ -119,17 +118,27 @@ public DaVinciConfig setCacheConfig(ObjectCacheConfig cacheConfig) { return this; } + public DaVinciConfig setRecordTransformerConfig(DaVinciRecordTransformerConfig recordTransformerConfig) { + this.recordTransformerConfig = recordTransformerConfig; + return this; + } + + public DaVinciRecordTransformerConfig getRecordTransformerConfig() { + return recordTransformerConfig; + } + public DaVinciRecordTransformer getRecordTransformer(Integer storeVersion) { - if (recordTransformerFunction != null) { - return recordTransformerFunction.apply(storeVersion); + if (recordTransformerConfig == null) { + return null; } - return null; + return recordTransformerConfig.getRecordTransformer(storeVersion); } - public DaVinciConfig setRecordTransformerFunction( - Function recordTransformerFunction) { - this.recordTransformerFunction = recordTransformerFunction; - return this; + public DaVinciRecordTransformerFunctionalInterface getRecordTransformerFunction() { + if (recordTransformerConfig == null) { + return null; + } + return recordTransformerConfig.getRecordTransformerFunction(); } public boolean isReadMetricsEnabled() { diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/client/DaVinciRecordTransformer.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/client/DaVinciRecordTransformer.java index 32de9dd5ed..526b42672d 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/client/DaVinciRecordTransformer.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/client/DaVinciRecordTransformer.java @@ -1,26 +1,32 @@ package com.linkedin.davinci.client; +import com.linkedin.davinci.store.AbstractStorageEngine; import com.linkedin.venice.annotation.Experimental; -import com.linkedin.venice.serializer.AvroSerializer; +import com.linkedin.venice.compression.VeniceCompressor; +import com.linkedin.venice.exceptions.VeniceException; import com.linkedin.venice.utils.lazy.Lazy; +import java.io.IOException; +import java.io.InputStream; import java.nio.ByteBuffer; +import java.util.Arrays; import org.apache.avro.Schema; +import org.objectweb.asm.ClassReader; /** - * This abstract class can be extended in order to transform records stored in the Da Vinci Client. + * This abstract class can be extended in order to transform records stored in the Da Vinci Client, + * or write to a custom storage of your choice. * * The input is what is consumed from the raw Venice data set, whereas the output is what is stored * into Da Vinci's local storage (e.g. RocksDB). * - * Please note that your implementation should be thread safe, and that schema evolution is possible. + * Implementations should be thread-safe and support schema evolution. * - * N.B.: The inputs are wrapped inside {@link Lazy} so that if the implementation need not look at - * them, the deserialization cost is not paid. + * Note: Inputs are wrapped inside {@link Lazy} to avoid deserialization costs if not needed. * - * @param type of the input key - * @param type of the input value - * @param type of the output value + * @param the type of the input key + * @param the type of the input value + * @param the type of the output value */ @Experimental public abstract class DaVinciRecordTransformer { @@ -29,77 +35,179 @@ public abstract class DaVinciRecordTransformer { */ private final int storeVersion; - public DaVinciRecordTransformer(int storeVersion) { + /** + * Boolean to determine if records should be stored in Da Vinci. + */ + private final boolean storeRecordsInDaVinci; + + private final DaVinciRecordTransformerUtility recordTransformerUtility; + + /** + * @param storeVersion the version of the store + * @param storeRecordsInDaVinci set this to false if you intend to store records in a custom storage, + * and not in the Da Vinci Client. + */ + public DaVinciRecordTransformer(int storeVersion, boolean storeRecordsInDaVinci) { this.storeVersion = storeVersion; + this.storeRecordsInDaVinci = storeRecordsInDaVinci; + this.recordTransformerUtility = new DaVinciRecordTransformerUtility<>(this); } /** - * This will be the type returned by the {@link DaVinciClient}'s read operations. + * Returns the schema for the key used in {@link DaVinciClient}'s operations. + * + * @return a {@link Schema} corresponding to the type of {@link K}. + */ + public abstract Schema getKeySchema(); + + /** + * Returns the schema for the output value used in {@link DaVinciClient}'s operations. * * @return a {@link Schema} corresponding to the type of {@link O}. */ - public abstract Schema getKeyOutputSchema(); + public abstract Schema getOutputValueSchema(); - public abstract Schema getValueOutputSchema(); + /** + * Implement this method to transform records before they are stored. + * This can be useful for tasks such as filtering out unused fields to save storage space. + * + * @param key the key of the record to be transformed + * @param value the value of the record to be transformed + * @return {@link DaVinciRecordTransformerResult} + */ + public abstract DaVinciRecordTransformerResult transform(Lazy key, Lazy value); /** - * @param key to be put - * @param value to be put - * @return the object to keep in storage, or null if the put should be skipped + * Implement this method to manage custom state outside the Da Vinci Client. + * + * @param key the key of the record to be put + * @param value the value of the record to be put, + * derived from the output of {@link #transform(Lazy key, Lazy value)} */ - public abstract O put(Lazy key, Lazy value); + public abstract void processPut(Lazy key, Lazy value); /** - * By default, deletes will proceed. This can be overridden if some deleted records should be kept. + * Override this method to customize the behavior for record deletions. + * For example, you can use this method to delete records from a custom storage outside the Da Vinci Client. + * By default, it performs no operation. * - * @param key to be deleted - * @return the object to keep in storage, or null to proceed with the deletion + * @param key the key of the record to be deleted */ - public O delete(Lazy key) { - return null; - } + public void processDelete(Lazy key) { + return; + }; /** - * This function is called as a lifecycle event at the beginning of an ingestion task. + * Lifecycle event triggered before consuming records for {@link #storeVersion}. + * Use this method to perform setup operations such as opening database connections or creating tables. * * By default, it performs no operation. - * */ - public void onStartIngestionTask() { + public void onStartVersionIngestion() { return; } /** - * This function is called as a lifecycle event at the end of an ingestion task + * Lifecycle event triggered when record consumption is stopped for {@link #storeVersion}. + * Use this method to perform cleanup operations such as closing database connections or dropping tables. * * By default, it performs no operation. - * */ - public void onEndIngestionTask() { + public void onEndVersionIngestion() { return; } /** - * Takes a value, serializes it and wrap it in a ByteByffer. + * Transforms and processes the given record. * - * @param schema the Avro schema defining the serialization format - * @param value value the value to be serialized - * @return a ByteBuffer containing the serialized value wrapped according to Avro specifications - */ - public final ByteBuffer getValueBytes(Schema schema, V value) { - ByteBuffer transformedBytes = ByteBuffer.wrap(new AvroSerializer(schema).serialize(value)); - ByteBuffer newBuffer = ByteBuffer.allocate(Integer.BYTES + transformedBytes.remaining()); - newBuffer.putInt(1); - newBuffer.put(transformedBytes); - newBuffer.flip(); - return newBuffer; + * @param key the key of the record to be put + * @param value the value of the record to be put + * @return {@link DaVinciRecordTransformerResult} + */ + public final DaVinciRecordTransformerResult transformAndProcessPut(Lazy key, Lazy value) { + DaVinciRecordTransformerResult transformerResult = transform(key, value); + DaVinciRecordTransformerResult.Result result = transformerResult.getResult(); + if (result == DaVinciRecordTransformerResult.Result.SKIP) { + return null; + } else if (result == DaVinciRecordTransformerResult.Result.UNCHANGED) { + processPut(key, (Lazy) value); + } else { + O transformedRecord = transformerResult.getValue(); + processPut(key, Lazy.of(() -> transformedRecord)); + } + + if (!storeRecordsInDaVinci) { + return null; + } + return transformerResult; } /** + * Serializes and compresses the value and prepends the schema ID to the resulting ByteBuffer. * - * @return the storeVersion + * @param value to be serialized and compressed + * @param schemaId to prepend to the ByteBuffer + * @return a ByteBuffer containing the schema ID followed by the serialized and compressed value + */ + public final ByteBuffer prependSchemaIdToHeader(O value, int schemaId, VeniceCompressor compressor) { + return recordTransformerUtility.prependSchemaIdToHeader(value, schemaId, compressor); + } + + /** + * Prepends the given schema ID to the provided ByteBuffer + * + * @param valueBytes the original serialized and compressed value + * @param schemaId to prepend to the ByteBuffer + * @return a ByteBuffer containing the schema ID followed by the serialized and compressed value + */ + public final ByteBuffer prependSchemaIdToHeader(ByteBuffer valueBytes, int schemaId) { + return recordTransformerUtility.prependSchemaIdToHeader(valueBytes, schemaId); + } + + /** + * @return {@link #storeVersion} */ public final int getStoreVersion() { return storeVersion; } + + /** + * @return the hash of the class bytecode + */ + // Visible for testing + public final int getClassHash() { + String className = this.getClass().getName().replace('.', '/') + ".class"; + try (InputStream inputStream = this.getClass().getClassLoader().getResourceAsStream(className)) { + ClassReader classReader = new ClassReader(inputStream); + byte[] bytecode = classReader.b; + return Arrays.hashCode(bytecode); + + } catch (IOException e) { + throw new VeniceException("Failed to get classHash", e); + } + } + + /** + * Bootstraps the client after it comes online. + */ + public final void onRecovery( + AbstractStorageEngine storageEngine, + Integer partition, + Lazy compressor) { + recordTransformerUtility.onRecovery(storageEngine, partition, compressor); + } + + /** + * @return {@link #storeRecordsInDaVinci} + */ + public final boolean getStoreRecordsInDaVinci() { + return storeRecordsInDaVinci; + } + + /** + * @return {@link #recordTransformerUtility} + */ + public final DaVinciRecordTransformerUtility getRecordTransformerUtility() { + return recordTransformerUtility; + } } diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/client/DaVinciRecordTransformerConfig.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/client/DaVinciRecordTransformerConfig.java new file mode 100644 index 0000000000..a92df1edae --- /dev/null +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/client/DaVinciRecordTransformerConfig.java @@ -0,0 +1,57 @@ +package com.linkedin.davinci.client; + +import org.apache.avro.Schema; + + +/** + * Configuration class for {@link DaVinciRecordTransformer}, which is passed into {@link DaVinciConfig}. + */ +public class DaVinciRecordTransformerConfig { + private final DaVinciRecordTransformerFunctionalInterface recordTransformerFunction; + private final Class outputValueClass; + private final Schema outputValueSchema; + + /** + * @param recordTransformerFunction the functional interface for creating a {@link DaVinciRecordTransformer} + * @param outputValueClass the class of the output value + * @param outputValueSchema the schema of the output value + */ + public DaVinciRecordTransformerConfig( + DaVinciRecordTransformerFunctionalInterface recordTransformerFunction, + Class outputValueClass, + Schema outputValueSchema) { + this.recordTransformerFunction = recordTransformerFunction; + this.outputValueClass = outputValueClass; + this.outputValueSchema = outputValueSchema; + } + + /** + * @return {@link #recordTransformerFunction} + */ + public DaVinciRecordTransformerFunctionalInterface getRecordTransformerFunction() { + return recordTransformerFunction; + } + + /** + * @param storeVersion the store version + * @return a new {@link DaVinciRecordTransformer} + */ + public DaVinciRecordTransformer getRecordTransformer(Integer storeVersion) { + return recordTransformerFunction.apply(storeVersion); + } + + /** + * @return {@link #outputValueClass} + */ + public Class getOutputValueClass() { + return outputValueClass; + } + + /** + * @return {@link #outputValueSchema} + */ + public Schema getOutputValueSchema() { + return outputValueSchema; + } + +} diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/client/DaVinciRecordTransformerFunctionalInterface.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/client/DaVinciRecordTransformerFunctionalInterface.java new file mode 100644 index 0000000000..9ad02fec64 --- /dev/null +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/client/DaVinciRecordTransformerFunctionalInterface.java @@ -0,0 +1,10 @@ +package com.linkedin.davinci.client; + +/** + * This describes the implementation for the functional interface of {@link DaVinciRecordTransformer} + */ + +@FunctionalInterface +public interface DaVinciRecordTransformerFunctionalInterface { + DaVinciRecordTransformer apply(Integer storeVersion); +} diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/client/DaVinciRecordTransformerResult.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/client/DaVinciRecordTransformerResult.java new file mode 100644 index 0000000000..6fb79d4a0b --- /dev/null +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/client/DaVinciRecordTransformerResult.java @@ -0,0 +1,57 @@ +package com.linkedin.davinci.client; + +import com.linkedin.venice.exceptions.VeniceException; + + +/** + * This class encapsulates the result of {@link DaVinciRecordTransformer#transform} + * + * @param the type of the output value + */ +public class DaVinciRecordTransformerResult { + public enum Result { + SKIP, UNCHANGED, TRANSFORMED + } + + private final Result result; + private O value; + + /** + * Use this constructor if result is {@link Result#SKIP} or {@link Result#UNCHANGED} + */ + public DaVinciRecordTransformerResult(Result result) { + if (result == Result.TRANSFORMED) { + throw new VeniceException( + "Invalid constructor usage:" + + "TRANSFORMED result passed to single-argument constructor. Use the two-argument constructor instead"); + } + + this.result = result; + } + + /** + * Use this constructor if result is {@link Result#TRANSFORMED} + */ + public DaVinciRecordTransformerResult(Result result, O value) { + if (result != Result.TRANSFORMED) { + throw new VeniceException( + "Invalid constructor usage:" + "This two-argument constructor only accepts TRANSFORMED results"); + } + this.result = result; + this.value = value; + } + + /** + * @return {@link Result} + */ + public Result getResult() { + return result; + } + + /** + * @return the transformed record + */ + public O getValue() { + return value; + } +} diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/client/DaVinciRecordTransformerUtility.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/client/DaVinciRecordTransformerUtility.java new file mode 100644 index 0000000000..72cdf35a86 --- /dev/null +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/client/DaVinciRecordTransformerUtility.java @@ -0,0 +1,161 @@ +package com.linkedin.davinci.client; + +import com.linkedin.davinci.store.AbstractStorageEngine; +import com.linkedin.davinci.store.AbstractStorageIterator; +import com.linkedin.venice.compression.VeniceCompressor; +import com.linkedin.venice.exceptions.VeniceException; +import com.linkedin.venice.serializer.AvroGenericDeserializer; +import com.linkedin.venice.serializer.AvroSerializer; +import com.linkedin.venice.utils.lazy.Lazy; +import java.io.BufferedReader; +import java.io.File; +import java.io.FileReader; +import java.io.FileWriter; +import java.io.IOException; +import java.nio.ByteBuffer; +import org.apache.avro.Schema; + + +/** + * Utility class for {@link DaVinciRecordTransformer}. + * + * @param the type of the input key + * @param the type of the output value + */ +public class DaVinciRecordTransformerUtility { + private final DaVinciRecordTransformer recordTransformer; + private AvroGenericDeserializer keyDeserializer; + private AvroGenericDeserializer outputValueDeserializer; + private AvroSerializer outputValueSerializer; + + public DaVinciRecordTransformerUtility(DaVinciRecordTransformer recordTransformer) { + this.recordTransformer = recordTransformer; + } + + /** + * Serializes and compresses the value and prepends the schema ID to the resulting ByteBuffer. + * + * @param value to be serialized and compressed + * @param schemaId to prepend to the ByteBuffer + * @return a ByteBuffer containing the schema ID followed by the serialized and compressed value + */ + public final ByteBuffer prependSchemaIdToHeader(O value, int schemaId, VeniceCompressor compressor) { + byte[] serializedValue = getOutputValueSerializer().serialize(value); + byte[] compressedValue; + try { + compressedValue = compressor.compress(serializedValue); + } catch (IOException e) { + throw new RuntimeException(e); + } + + ByteBuffer valueBytes = ByteBuffer.wrap(compressedValue); + ByteBuffer newValueBytes = ByteBuffer.allocate(Integer.BYTES + valueBytes.remaining()); + newValueBytes.putInt(schemaId); + newValueBytes.put(valueBytes); + newValueBytes.flip(); + return newValueBytes; + } + + /** + * Prepends the given schema ID to the provided ByteBuffer + * + * @param valueBytes the original serialized and compressed value + * @param schemaId to prepend to the ByteBuffer + * @return a ByteBuffer containing the schema ID followed by the serialized and compressed value + */ + public final ByteBuffer prependSchemaIdToHeader(ByteBuffer valueBytes, int schemaId) { + ByteBuffer newBuffer = ByteBuffer.allocate(Integer.BYTES + valueBytes.remaining()); + newBuffer.putInt(schemaId); + newBuffer.put(valueBytes); + newBuffer.flip(); + return newBuffer; + } + + /** + * @return true if transformer logic has changed since the last time the class was loaded + */ + public boolean hasTransformerLogicChanged(int classHash) { + try { + String classHashPath = String.format("./classHash-%d.txt", recordTransformer.getStoreVersion()); + File f = new File(classHashPath); + if (f.exists()) { + try (BufferedReader br = new BufferedReader(new FileReader(classHashPath))) { + int storedClassHash = Integer.parseInt(br.readLine()); + if (storedClassHash == classHash) { + return false; + } + } + } + + try (FileWriter fw = new FileWriter(classHashPath)) { + fw.write(String.valueOf(classHash)); + } + return true; + } catch (IOException e) { + throw new VeniceException("Failed to check if transformation logic has changed", e); + } + } + + /** + * Bootstraps the client after it comes online. + */ + public final void onRecovery( + AbstractStorageEngine storageEngine, + Integer partition, + Lazy compressor) { + // ToDo: Store class hash in RocksDB to support blob transfer + int classHash = recordTransformer.getClassHash(); + boolean transformerLogicChanged = hasTransformerLogicChanged(classHash); + + if (!recordTransformer.getStoreRecordsInDaVinci() || transformerLogicChanged) { + // Bootstrap from VT + storageEngine.clearPartitionOffset(partition); + } else { + // Bootstrap from local storage + AbstractStorageIterator iterator = storageEngine.getIterator(partition); + for (iterator.seekToFirst(); iterator.isValid(); iterator.next()) { + byte[] keyBytes = iterator.key(); + byte[] valueBytes = iterator.value(); + Lazy lazyKey = Lazy.of(() -> getKeyDeserializer().deserialize(ByteBuffer.wrap(keyBytes))); + Lazy lazyValue = Lazy.of(() -> { + ByteBuffer valueByteBuffer = ByteBuffer.wrap(valueBytes); + // Skip schema id + valueByteBuffer.position(Integer.BYTES); + ByteBuffer decompressedValueBytes; + try { + decompressedValueBytes = compressor.get().decompress(valueByteBuffer); + } catch (IOException e) { + throw new RuntimeException(e); + } + return getOutputValueDeserializer().deserialize(decompressedValueBytes); + }); + + recordTransformer.processPut(lazyKey, lazyValue); + } + } + } + + public AvroGenericDeserializer getKeyDeserializer() { + if (keyDeserializer == null) { + Schema keySchema = recordTransformer.getKeySchema(); + keyDeserializer = new AvroGenericDeserializer<>(keySchema, keySchema); + } + return keyDeserializer; + } + + public AvroGenericDeserializer getOutputValueDeserializer() { + if (outputValueDeserializer == null) { + Schema outputValueSchema = recordTransformer.getOutputValueSchema(); + outputValueDeserializer = new AvroGenericDeserializer<>(outputValueSchema, outputValueSchema); + } + return outputValueDeserializer; + } + + public AvroSerializer getOutputValueSerializer() { + if (outputValueSerializer == null) { + Schema outputValueSchema = recordTransformer.getOutputValueSchema(); + outputValueSerializer = new AvroSerializer<>(outputValueSchema); + } + return outputValueSerializer; + } +} diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/client/factory/CachingDaVinciClientFactory.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/client/factory/CachingDaVinciClientFactory.java index 7bdf8df43f..7479412a62 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/client/factory/CachingDaVinciClientFactory.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/client/factory/CachingDaVinciClientFactory.java @@ -169,6 +169,16 @@ public DaVinciClient getGenericAvroClient(String storeName, DaVinci false); } + public DaVinciClient getGenericAvroClient(String storeName, DaVinciConfig config, Class valueClass) { + return getClient( + storeName, + config, + valueClass, + new GenericDaVinciClientConstructor<>(), + getClientClass(config, false), + false); + } + @Override public DaVinciClient getAndStartGenericAvroClient(String storeName, DaVinciConfig config) { return getClient( @@ -180,6 +190,19 @@ public DaVinciClient getAndStartGenericAvroClient(String storeName, true); } + public DaVinciClient getAndStartGenericAvroClient( + String storeName, + DaVinciConfig config, + Class valueClass) { + return getClient( + storeName, + config, + valueClass, + new GenericDaVinciClientConstructor<>(), + getClientClass(config, false), + true); + } + @Override public DaVinciClient getSpecificAvroClient( String storeName, diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/config/VeniceStoreVersionConfig.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/config/VeniceStoreVersionConfig.java index 98a8a06875..da753e1afc 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/config/VeniceStoreVersionConfig.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/config/VeniceStoreVersionConfig.java @@ -1,5 +1,6 @@ package com.linkedin.davinci.config; +import static com.linkedin.davinci.store.rocksdb.RocksDBServerConfig.RECORD_TRANSFORMER_VALUE_SCHEMA; import static com.linkedin.davinci.store.rocksdb.RocksDBServerConfig.ROCKSDB_PLAIN_TABLE_FORMAT_ENABLED; import static com.linkedin.venice.ConfigKeys.ENABLE_BLOB_TRANSFER; @@ -44,6 +45,7 @@ public VeniceStoreVersionConfig( // Stores all storage engine configs that are needed to be persisted to disk. this.persistStorageEngineConfigs = new PropertyBuilder() .put(ROCKSDB_PLAIN_TABLE_FORMAT_ENABLED, getRocksDBServerConfig().isRocksDBPlainTableFormatEnabled()) + .put(RECORD_TRANSFORMER_VALUE_SCHEMA, getRocksDBServerConfig().getTransformerValueSchema()) .build(); } diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/ActiveActiveStoreIngestionTask.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/ActiveActiveStoreIngestionTask.java index 38c7d6bbb9..65d2d679ed 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/ActiveActiveStoreIngestionTask.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/ActiveActiveStoreIngestionTask.java @@ -5,7 +5,7 @@ import static com.linkedin.venice.writer.VeniceWriter.APP_DEFAULT_LOGICAL_TS; import com.linkedin.avroutil1.compatibility.AvroCompatibilityHelper; -import com.linkedin.davinci.client.DaVinciRecordTransformer; +import com.linkedin.davinci.client.DaVinciRecordTransformerFunctionalInterface; import com.linkedin.davinci.config.VeniceServerConfig; import com.linkedin.davinci.config.VeniceStoreVersionConfig; import com.linkedin.davinci.replication.RmdWithValueSchemaId; @@ -72,7 +72,6 @@ import java.util.concurrent.locks.ReentrantLock; import java.util.function.BiConsumer; import java.util.function.BooleanSupplier; -import java.util.function.Function; import org.apache.avro.generic.GenericRecord; import org.apache.avro.io.BinaryDecoder; import org.apache.logging.log4j.LogManager; @@ -113,7 +112,7 @@ public ActiveActiveStoreIngestionTask( int errorPartitionId, boolean isIsolatedIngestion, Optional cacheBackend, - Function getRecordTransformer) { + DaVinciRecordTransformerFunctionalInterface recordTransformerFunction) { super( builder, store, @@ -124,7 +123,7 @@ public ActiveActiveStoreIngestionTask( errorPartitionId, isIsolatedIngestion, cacheBackend, - getRecordTransformer); + recordTransformerFunction); this.rmdProtocolVersionId = version.getRmdVersionId(); diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/KafkaStoreIngestionService.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/KafkaStoreIngestionService.java index 7ed014aa14..e1ff299155 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/KafkaStoreIngestionService.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/KafkaStoreIngestionService.java @@ -13,7 +13,7 @@ import static java.lang.Thread.currentThread; import static java.lang.Thread.sleep; -import com.linkedin.davinci.client.DaVinciRecordTransformer; +import com.linkedin.davinci.client.DaVinciRecordTransformerFunctionalInterface; import com.linkedin.davinci.compression.StorageEngineBackedCompressorFactory; import com.linkedin.davinci.config.VeniceConfigLoader; import com.linkedin.davinci.config.VeniceServerConfig; @@ -111,7 +111,6 @@ import java.util.concurrent.locks.ReentrantLock; import java.util.function.BiConsumer; import java.util.function.BooleanSupplier; -import java.util.function.Function; import org.apache.avro.Schema; import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.common.protocol.SecurityProtocol; @@ -175,7 +174,7 @@ public class KafkaStoreIngestionService extends AbstractVeniceService implements // source. This could be a view of the data, or in our case a cache, or both potentially. private final Optional cacheBackend; - private final Function getRecordTransformer; + private final DaVinciRecordTransformerFunctionalInterface recordTransformerFunction; private final PubSubProducerAdapterFactory producerAdapterFactory; @@ -207,14 +206,14 @@ public KafkaStoreIngestionService( boolean isIsolatedIngestion, StorageEngineBackedCompressorFactory compressorFactory, Optional cacheBackend, - Function getRecordTransformer, + DaVinciRecordTransformerFunctionalInterface recordTransformerFunction, boolean isDaVinciClient, RemoteIngestionRepairService remoteIngestionRepairService, PubSubClientsFactory pubSubClientsFactory, Optional sslFactory, HeartbeatMonitoringService heartbeatMonitoringService) { this.cacheBackend = cacheBackend; - this.getRecordTransformer = getRecordTransformer; + this.recordTransformerFunction = recordTransformerFunction; this.storageMetadataService = storageMetadataService; this.metadataRepo = metadataRepo; this.topicNameToIngestionTaskMap = new ConcurrentSkipListMap<>(); @@ -528,7 +527,7 @@ private StoreIngestionTask createStoreIngestionTask( partitionId, isIsolatedIngestion, cacheBackend, - getRecordTransformer); + recordTransformerFunction); } private static void shutdownExecutorService(ExecutorService executor, String name, boolean force) { diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/LeaderFollowerStoreIngestionTask.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/LeaderFollowerStoreIngestionTask.java index aad9b9be71..d16b93d63e 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/LeaderFollowerStoreIngestionTask.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/LeaderFollowerStoreIngestionTask.java @@ -15,7 +15,7 @@ import static java.lang.Long.max; import static java.util.concurrent.TimeUnit.MILLISECONDS; -import com.linkedin.davinci.client.DaVinciRecordTransformer; +import com.linkedin.davinci.client.DaVinciRecordTransformerFunctionalInterface; import com.linkedin.davinci.config.VeniceStoreVersionConfig; import com.linkedin.davinci.helix.LeaderFollowerPartitionStateModel; import com.linkedin.davinci.ingestion.LagType; @@ -103,7 +103,6 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiConsumer; import java.util.function.BooleanSupplier; -import java.util.function.Function; import java.util.function.LongPredicate; import java.util.function.Predicate; import java.util.function.Supplier; @@ -204,7 +203,7 @@ public LeaderFollowerStoreIngestionTask( int errorPartitionId, boolean isIsolatedIngestion, Optional cacheBackend, - Function getRecordTransformer) { + DaVinciRecordTransformerFunctionalInterface recordTransformerFunction) { super( builder, store, @@ -215,7 +214,7 @@ public LeaderFollowerStoreIngestionTask( errorPartitionId, isIsolatedIngestion, cacheBackend, - getRecordTransformer, + recordTransformerFunction, builder.getLeaderFollowerNotifiers()); this.heartbeatMonitoringService = builder.getHeartbeatMonitoringService(); /** diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java index 6472161cfc..eb6d71eb4e 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java @@ -20,6 +20,8 @@ import com.linkedin.davinci.client.BlockingDaVinciRecordTransformer; import com.linkedin.davinci.client.DaVinciRecordTransformer; +import com.linkedin.davinci.client.DaVinciRecordTransformerFunctionalInterface; +import com.linkedin.davinci.client.DaVinciRecordTransformerResult; import com.linkedin.davinci.compression.StorageEngineBackedCompressorFactory; import com.linkedin.davinci.config.VeniceServerConfig; import com.linkedin.davinci.config.VeniceStoreVersionConfig; @@ -94,6 +96,7 @@ import com.linkedin.venice.serialization.avro.InternalAvroSpecificSerializer; import com.linkedin.venice.serializer.AvroGenericDeserializer; import com.linkedin.venice.serializer.FastSerializerDeserializerFactory; +import com.linkedin.venice.serializer.RecordDeserializer; import com.linkedin.venice.storage.protocol.ChunkedValueManifest; import com.linkedin.venice.system.store.MetaStoreWriter; import com.linkedin.venice.utils.ByteUtils; @@ -311,7 +314,7 @@ public abstract class StoreIngestionTask implements Runnable, Closeable { protected final ChunkAssembler chunkAssembler; private final Optional cacheBackend; - private final DaVinciRecordTransformer recordTransformer; + private DaVinciRecordTransformer recordTransformer; protected final String localKafkaServer; protected final int localKafkaClusterId; @@ -355,7 +358,7 @@ public StoreIngestionTask( int errorPartitionId, boolean isIsolatedIngestion, Optional cacheBackend, - Function getRecordTransformer, + DaVinciRecordTransformerFunctionalInterface recordTransformerFunction, Queue notifiers) { this.storeConfig = storeConfig; this.readCycleDelayMs = storeConfig.getKafkaReadCycleDelayMs(); @@ -455,16 +458,26 @@ public StoreIngestionTask( this.chunkAssembler = new ChunkAssembler(storeName); this.cacheBackend = cacheBackend; - // Ensure getRecordTransformer does not return null - DaVinciRecordTransformer clientRecordTransformer = - getRecordTransformer != null ? getRecordTransformer.apply(store.getCurrentVersion()) : null; - this.recordTransformer = - clientRecordTransformer != null ? new BlockingDaVinciRecordTransformer(clientRecordTransformer) : null; - if (this.recordTransformer != null) { + if (recordTransformerFunction != null) { + DaVinciRecordTransformer clientRecordTransformer = recordTransformerFunction.apply(versionNumber); + this.recordTransformer = new BlockingDaVinciRecordTransformer( + clientRecordTransformer, + clientRecordTransformer.getStoreRecordsInDaVinci()); versionedIngestionStats.registerTransformerLatencySensor(storeName, versionNumber); versionedIngestionStats.registerTransformerLifecycleStartLatency(storeName, versionNumber); versionedIngestionStats.registerTransformerLifecycleEndLatency(storeName, versionNumber); versionedIngestionStats.registerTransformerErrorSensor(storeName, versionNumber); + + // onStartVersionIngestion called here instead of run() because this needs to finish running + // before bootstrapping starts + long startTime = System.currentTimeMillis(); + recordTransformer.onStartVersionIngestion(); + long endTime = System.currentTimeMillis(); + versionedIngestionStats.recordTransformerLifecycleStartLatency( + storeName, + versionNumber, + LatencyUtils.getElapsedTimeFromMsToMs(startTime), + endTime); } this.localKafkaServer = this.kafkaProps.getProperty(KAFKA_BOOTSTRAP_SERVERS); @@ -595,8 +608,13 @@ void resubscribeForAllPartitions() throws InterruptedException { */ public synchronized void subscribePartition(PubSubTopicPartition topicPartition, boolean isHelixTriggeredAction) { throwIfNotRunning(); - partitionToPendingConsumerActionCountMap - .computeIfAbsent(topicPartition.getPartitionNumber(), x -> new AtomicInteger(0)) + int partitionNumber = topicPartition.getPartitionNumber(); + + if (recordTransformer != null) { + recordTransformer.onRecovery(storageEngine, partitionNumber, compressor); + } + + partitionToPendingConsumerActionCountMap.computeIfAbsent(partitionNumber, x -> new AtomicInteger(0)) .incrementAndGet(); consumerActionsQueue.add(new ConsumerAction(SUBSCRIBE, topicPartition, nextSeqNum(), isHelixTriggeredAction)); } @@ -1539,17 +1557,6 @@ public void run() { LOGGER.info("Running {}", ingestionTaskName); versionedIngestionStats.resetIngestionTaskPushTimeoutGauge(storeName, versionNumber); - if (recordTransformer != null) { - long startTime = System.currentTimeMillis(); - recordTransformer.onStartIngestionTask(); - long endTime = System.currentTimeMillis(); - versionedIngestionStats.recordTransformerLifecycleStartLatency( - storeName, - versionNumber, - LatencyUtils.getElapsedTimeFromMsToMs(startTime), - endTime); - } - while (isRunning()) { Store store = storeRepository.getStoreOrThrow(storeName); updateIngestionRoleIfStoreChanged(store); @@ -3503,20 +3510,19 @@ private int processKafkaDataMessage( // Check if put.getSchemaId is positive, if not default to 1 int putSchemaId = put.getSchemaId() > 0 ? put.getSchemaId() : 1; - // Do transformation recompute key, value and partition if (recordTransformer != null) { - long recordTransformStartTime = System.currentTimeMillis(); + long recordTransformerStartTime = System.currentTimeMillis(); ByteBuffer valueBytes = put.getPutValue(); Schema valueSchema = schemaRepository.getValueSchema(storeName, putSchemaId).getSchema(); + Lazy recordDeserializer = + Lazy.of(() -> new AvroGenericDeserializer<>(valueSchema, valueSchema)); - // Decompress/assemble record - Object assembledObject = chunkAssembler.bufferAndAssembleRecord( + ByteBuffer assembledObject = chunkAssembler.bufferAndAssembleRecord( consumerRecord.getTopicPartition(), putSchemaId, keyBytes, valueBytes, consumerRecord.getOffset(), - Lazy.of(() -> new AvroGenericDeserializer<>(valueSchema, valueSchema)), putSchemaId, compressor.get()); @@ -3527,25 +3533,46 @@ private int processKafkaDataMessage( SchemaEntry keySchema = schemaRepository.getKeySchema(storeName); Lazy lazyKey = Lazy.of(() -> deserializeAvroObjectAndReturn(ByteBuffer.wrap(keyBytes), keySchema)); - Lazy lazyValue = Lazy.of(() -> assembledObject); + Lazy lazyValue = Lazy.of(() -> { + try { + ByteBuffer decompressedAssembledObject = compressor.get().decompress(assembledObject); + return recordDeserializer.get().deserialize(decompressedAssembledObject); + } catch (IOException e) { + throw new RuntimeException(e); + } + }); - Object transformedRecord = null; + DaVinciRecordTransformerResult transformerResult; try { - transformedRecord = recordTransformer.put(lazyKey, lazyValue); + transformerResult = recordTransformer.transformAndProcessPut(lazyKey, lazyValue); } catch (Exception e) { versionedIngestionStats.recordTransformerError(storeName, versionNumber, 1, currentTimeMs); String errorMessage = "Record transformer experienced an error when transforming value=" + assembledObject; throw new VeniceMessageException(errorMessage, e); } - ByteBuffer transformedBytes = - recordTransformer.getValueBytes(recordTransformer.getValueOutputSchema(), transformedRecord); + + // Record was skipped, so don't write to storage engine + if (transformerResult == null + || transformerResult.getResult() == DaVinciRecordTransformerResult.Result.SKIP) { + return 0; + } + + ByteBuffer transformedBytes; + if (transformerResult.getResult() == DaVinciRecordTransformerResult.Result.UNCHANGED) { + // Use original value if the record wasn't transformed + transformedBytes = recordTransformer.prependSchemaIdToHeader(assembledObject, putSchemaId); + } else { + // Serialize and compress the new record if it was transformed + transformedBytes = + recordTransformer.prependSchemaIdToHeader(transformerResult.getValue(), putSchemaId, compressor.get()); + } put.putValue = transformedBytes; versionedIngestionStats.recordTransformerLatency( storeName, versionNumber, - LatencyUtils.getElapsedTimeFromMsToMs(recordTransformStartTime), + LatencyUtils.getElapsedTimeFromMsToMs(recordTransformerStartTime), currentTimeMs); writeToStorageEngine(producedPartition, keyBytes, put); } else { @@ -3577,6 +3604,20 @@ private int processKafkaDataMessage( keyBytes = leaderProducedRecordContext.getKeyBytes(); delete = ((Delete) leaderProducedRecordContext.getValueUnion()); } + + if (recordTransformer != null) { + SchemaEntry keySchema = schemaRepository.getKeySchema(storeName); + Lazy lazyKey = Lazy.of(() -> deserializeAvroObjectAndReturn(ByteBuffer.wrap(keyBytes), keySchema)); + recordTransformer.processDelete(lazyKey); + + // This is called here after processDelete because if the user stores their data somewhere other than + // Da Vinci, this function needs to execute to allow them to delete the data from the appropriate store + if (!recordTransformer.getStoreRecordsInDaVinci()) { + // If we're not storing in Da Vinci, then no need to try to delete from the storageEngine + break; + } + } + keyLen = keyBytes.length; deleteFromStorageEngine(producedPartition, keyBytes, delete); if (metricsEnabled && recordLevelMetricEnabled.get()) { @@ -3852,7 +3893,7 @@ public synchronized void close() { if (recordTransformer != null) { long startTime = System.currentTimeMillis(); - recordTransformer.onEndIngestionTask(); + recordTransformer.onEndVersionIngestion(); long endTime = System.currentTimeMillis(); versionedIngestionStats.recordTransformerLifecycleEndLatency( storeName, diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTaskFactory.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTaskFactory.java index 21b480af85..6a48231c3c 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTaskFactory.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTaskFactory.java @@ -1,6 +1,6 @@ package com.linkedin.davinci.kafka.consumer; -import com.linkedin.davinci.client.DaVinciRecordTransformer; +import com.linkedin.davinci.client.DaVinciRecordTransformerFunctionalInterface; import com.linkedin.davinci.compression.StorageEngineBackedCompressorFactory; import com.linkedin.davinci.config.VeniceServerConfig; import com.linkedin.davinci.config.VeniceStoreVersionConfig; @@ -29,7 +29,6 @@ import java.util.Queue; import java.util.concurrent.ExecutorService; import java.util.function.BooleanSupplier; -import java.util.function.Function; public class StoreIngestionTaskFactory { @@ -52,7 +51,7 @@ public StoreIngestionTask getNewIngestionTask( int partitionId, boolean isIsolatedIngestion, Optional cacheBackend, - Function getRecordTransformer) { + DaVinciRecordTransformerFunctionalInterface recordTransformerFunction) { if (version.isActiveActiveReplicationEnabled()) { return new ActiveActiveStoreIngestionTask( builder, @@ -64,7 +63,7 @@ public StoreIngestionTask getNewIngestionTask( partitionId, isIsolatedIngestion, cacheBackend, - getRecordTransformer); + recordTransformerFunction); } return new LeaderFollowerStoreIngestionTask( builder, @@ -76,7 +75,7 @@ public StoreIngestionTask getNewIngestionTask( partitionId, isIsolatedIngestion, cacheBackend, - getRecordTransformer); + recordTransformerFunction); } /** diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/store/AbstractStorageEngine.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/store/AbstractStorageEngine.java index 1562c71469..c49292ee1f 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/store/AbstractStorageEngine.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/store/AbstractStorageEngine.java @@ -751,4 +751,8 @@ public void suppressLogs(boolean suppressLogs) { public boolean hasMemorySpaceLeft() { return true; } + + public AbstractStorageIterator getIterator(int partitionId) { + throw new UnsupportedOperationException("Method not supported for storage engine"); + } } diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/store/AbstractStorageIterator.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/store/AbstractStorageIterator.java new file mode 100644 index 0000000000..35db8173c0 --- /dev/null +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/store/AbstractStorageIterator.java @@ -0,0 +1,61 @@ +package com.linkedin.davinci.store; + +public abstract class AbstractStorageIterator { + /** + * @return true if the iterator is valid + */ + public boolean isValid() { + throw new UnsupportedOperationException("Method not implemented"); + } + + /** + * @return the key of the current entry + */ + public byte[] key() { + throw new UnsupportedOperationException("Method not implemented"); + } + + /** + * @return the value of the current entry + */ + public byte[] value() { + throw new UnsupportedOperationException("Method not implemented"); + } + + /** + * Moves to the next entry + */ + public void next() { + throw new UnsupportedOperationException("Method not implemented"); + } + + /** + * Moves to the previous entry + */ + public void prev() { + throw new UnsupportedOperationException("Method not implemented"); + } + + /** + * Seeks to the first entry whose key is greater than or equal to the given key + * + * @param key the key to seek to + */ + public void seek(byte[] key) { + throw new UnsupportedOperationException("Method not implemented"); + } + + /** + * Seek to the first key + */ + public void seekToFirst() { + throw new UnsupportedOperationException("Method not implemented"); + } + + /** + * Seek to the last key + */ + public void seekToLast() { + throw new UnsupportedOperationException("Method not implemented"); + } +} diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/store/AbstractStoragePartition.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/store/AbstractStoragePartition.java index 7292438a16..1668f038a4 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/store/AbstractStoragePartition.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/store/AbstractStoragePartition.java @@ -175,4 +175,8 @@ public void deleteWithReplicationMetadata(byte[] key, byte[] metadata) { public long getRmdByteUsage() { throw new VeniceUnsupportedOperationException("getRmdByteUsage"); } + + public AbstractStorageIterator getIterator() { + throw new UnsupportedOperationException("Method not supported for storage engine"); + } } diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/store/rocksdb/RocksDBServerConfig.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/store/rocksdb/RocksDBServerConfig.java index 109d95dc45..551dc3bf04 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/store/rocksdb/RocksDBServerConfig.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/store/rocksdb/RocksDBServerConfig.java @@ -406,8 +406,7 @@ public RocksDBServerConfig(VeniceProperties props) { */ this.maxLogFileNum = props.getInt(ROCKSDB_MAX_LOG_FILE_NUM, 3); this.maxLogFileSize = props.getSizeInBytes(ROCKSDB_MAX_LOG_FILE_SIZE, 10 * 1024 * 1024); // 10MB; - this.transformerValueSchema = - props.containsKey(RECORD_TRANSFORMER_VALUE_SCHEMA) ? props.getString(RECORD_TRANSFORMER_VALUE_SCHEMA) : "null"; + this.transformerValueSchema = props.getString(RECORD_TRANSFORMER_VALUE_SCHEMA, "null"); } public int getLevel0FileNumCompactionTriggerWriteOnlyVersion() { diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/store/rocksdb/RocksDBStorageEngine.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/store/rocksdb/RocksDBStorageEngine.java index 6ab36025b5..98db584f10 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/store/rocksdb/RocksDBStorageEngine.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/store/rocksdb/RocksDBStorageEngine.java @@ -6,6 +6,7 @@ import com.linkedin.davinci.config.VeniceStoreVersionConfig; import com.linkedin.davinci.stats.RocksDBMemoryStats; import com.linkedin.davinci.store.AbstractStorageEngine; +import com.linkedin.davinci.store.AbstractStorageIterator; import com.linkedin.davinci.store.AbstractStoragePartition; import com.linkedin.davinci.store.StoragePartitionConfig; import com.linkedin.venice.exceptions.VeniceException; @@ -220,9 +221,7 @@ boolean hasConflictPersistedStoreEngineConfig() { VeniceProperties persistedStorageEngineConfig = Utils.parseProperties(storeEngineConfig); LOGGER.info("Found storage engine configs: {}", persistedStorageEngineConfig.toString(true)); boolean usePlainTableFormat = persistedStorageEngineConfig.getBoolean(ROCKSDB_PLAIN_TABLE_FORMAT_ENABLED, true); - String transformerValueSchema = persistedStorageEngineConfig.containsKey(RECORD_TRANSFORMER_VALUE_SCHEMA) - ? persistedStorageEngineConfig.getString(RECORD_TRANSFORMER_VALUE_SCHEMA) - : "null"; + String transformerValueSchema = persistedStorageEngineConfig.getString(RECORD_TRANSFORMER_VALUE_SCHEMA, "null"); if (usePlainTableFormat != rocksDBServerConfig.isRocksDBPlainTableFormatEnabled()) { String existingTableFormat = usePlainTableFormat ? "PlainTable" : "BlockBasedTable"; String newTableFormat = @@ -292,4 +291,10 @@ public boolean hasMemorySpaceLeft() { public void setRocksDBServerConfig(RocksDBServerConfig rocksDBServerConfig) { this.rocksDBServerConfig = rocksDBServerConfig; } + + @Override + public AbstractStorageIterator getIterator(int partitionId) { + AbstractStoragePartition partition = getPartitionOrThrow(partitionId); + return partition.getIterator(); + } } diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/store/rocksdb/RocksDBStorageIterator.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/store/rocksdb/RocksDBStorageIterator.java new file mode 100644 index 0000000000..024daaaaca --- /dev/null +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/store/rocksdb/RocksDBStorageIterator.java @@ -0,0 +1,53 @@ +package com.linkedin.davinci.store.rocksdb; + +import com.linkedin.davinci.store.AbstractStorageIterator; +import org.rocksdb.RocksIterator; + + +public class RocksDBStorageIterator extends AbstractStorageIterator { + public final RocksIterator iterator; + + public RocksDBStorageIterator(RocksIterator iterator) { + this.iterator = iterator; + } + + @Override + public boolean isValid() { + return iterator.isValid(); + } + + @Override + public byte[] key() { + return iterator.key(); + } + + @Override + public byte[] value() { + return iterator.value(); + } + + @Override + public void next() { + iterator.next(); + } + + @Override + public void prev() { + iterator.prev(); + } + + @Override + public void seek(byte[] key) { + iterator.seek(key); + } + + @Override + public void seekToFirst() { + iterator.seekToFirst(); + } + + @Override + public void seekToLast() { + iterator.seekToLast(); + } +} diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/store/rocksdb/RocksDBStoragePartition.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/store/rocksdb/RocksDBStoragePartition.java index 87a084dba9..4c0742dd65 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/store/rocksdb/RocksDBStoragePartition.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/store/rocksdb/RocksDBStoragePartition.java @@ -6,6 +6,7 @@ import com.linkedin.davinci.callback.BytesStreamingCallback; import com.linkedin.davinci.config.VeniceStoreVersionConfig; import com.linkedin.davinci.stats.RocksDBMemoryStats; +import com.linkedin.davinci.store.AbstractStorageIterator; import com.linkedin.davinci.store.AbstractStoragePartition; import com.linkedin.davinci.store.StoragePartitionConfig; import com.linkedin.venice.exceptions.MemoryLimitExhaustedException; @@ -987,4 +988,9 @@ public String getFullPathForTempSSTFileDir() { public RocksDBSstFileWriter getRocksDBSstFileWriter() { return rocksDBSstFileWriter; } + + @Override + public AbstractStorageIterator getIterator() { + return new RocksDBStorageIterator(rocksDB.newIterator()); + } } diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/utils/ChunkAssembler.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/utils/ChunkAssembler.java index 02d72ac975..3000207a31 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/utils/ChunkAssembler.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/utils/ChunkAssembler.java @@ -44,6 +44,12 @@ public ChunkAssembler(String storeName) { this.inMemoryStorageEngine.suppressLogs(true); } + /** + * Buffers and assembles chunks of a record. + * + * If the record is chunked, it stores the chunks and returns null. + * Once all chunks of a record are received, it assembles, decompresses, and deserializes the record. + */ public T bufferAndAssembleRecord( PubSubTopicPartition pubSubTopicPartition, int schemaId, @@ -53,7 +59,46 @@ public T bufferAndAssembleRecord( Lazy> recordDeserializer, int readerSchemaId, VeniceCompressor compressor) { - T assembledRecord = null; + ByteBuffer assembledRecord = bufferAndAssembleRecord( + pubSubTopicPartition, + schemaId, + keyBytes, + valueBytes, + recordOffset, + readerSchemaId, + compressor); + T decompressedAndDeserializedRecord = null; + + // Record is a chunk. Return null + if (assembledRecord == null) { + return decompressedAndDeserializedRecord; + } + + try { + decompressedAndDeserializedRecord = + decompressAndDeserialize(recordDeserializer.get(), compressor, assembledRecord); + } catch (Exception e) { + throw new RuntimeException(e); + } + + return decompressedAndDeserializedRecord; + } + + /** + * Buffers and assembles chunks of a record. + * + * If the record is chunked, it stores the chunks and returns null. + * Once all chunks of a record are received, it returns the compressed and serialized assembled record. + */ + public ByteBuffer bufferAndAssembleRecord( + PubSubTopicPartition pubSubTopicPartition, + int schemaId, + byte[] keyBytes, + ByteBuffer valueBytes, + long recordOffset, + int readerSchemaId, + VeniceCompressor compressor) { + ByteBuffer assembledRecord = null; if (!inMemoryStorageEngine.containsPartition(pubSubTopicPartition.getPartitionNumber())) { inMemoryStorageEngine.addStoragePartition(pubSubTopicPartition.getPartitionNumber()); @@ -72,21 +117,18 @@ public T bufferAndAssembleRecord( keyBytes, ValueRecord.create(schemaId, valueBytes.array()).serialize()); try { - assembledRecord = decompressAndDeserialize( - recordDeserializer.get(), + assembledRecord = RawBytesChunkingAdapter.INSTANCE.get( + inMemoryStorageEngine, + pubSubTopicPartition.getPartitionNumber(), + ByteBuffer.wrap(keyBytes), + false, + null, + null, + NoOpReadResponseStats.SINGLETON, + readerSchemaId, + RawBytesStoreDeserializerCache.getInstance(), compressor, - RawBytesChunkingAdapter.INSTANCE.get( - inMemoryStorageEngine, - pubSubTopicPartition.getPartitionNumber(), - ByteBuffer.wrap(keyBytes), - false, - null, - null, - NoOpReadResponseStats.SINGLETON, - readerSchemaId, - RawBytesStoreDeserializerCache.getInstance(), - compressor, - null)); + null); } catch (Exception ex) { // We might get an exception if we haven't persisted all the chunks for a given key. This // can actually happen if the client seeks to the middle of a chunked record either by @@ -98,9 +140,9 @@ public T bufferAndAssembleRecord( pubSubTopicPartition.getPubSubTopic().getName()); } } else { - // this is a fully specified record, no need to buffer and assemble it, just decompress and deserialize it + // this is a fully specified record, no need to buffer and assemble it, just return the valueBytes try { - assembledRecord = decompressAndDeserialize(recordDeserializer.get(), compressor, valueBytes); + assembledRecord = valueBytes; } catch (Exception e) { throw new RuntimeException(e); } diff --git a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/client/AvroGenericDaVinciClientTest.java b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/client/AvroGenericDaVinciClientTest.java index bd9aec76fe..9712c73d21 100644 --- a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/client/AvroGenericDaVinciClientTest.java +++ b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/client/AvroGenericDaVinciClientTest.java @@ -3,21 +3,35 @@ import static com.linkedin.davinci.client.AvroGenericDaVinciClient.READ_CHUNK_EXECUTOR; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.Mockito.anyString; +import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; import static org.mockito.Mockito.when; import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertThrows; import com.linkedin.alpini.base.concurrency.Executors; +import com.linkedin.davinci.DaVinciBackend; import com.linkedin.davinci.StoreBackend; import com.linkedin.davinci.VersionBackend; import com.linkedin.davinci.store.rocksdb.RocksDBServerConfig; +import com.linkedin.davinci.transformer.TestStringRecordTransformer; +import com.linkedin.venice.client.exceptions.VeniceClientException; import com.linkedin.venice.client.store.ClientConfig; +import com.linkedin.venice.controllerapi.D2ServiceDiscoveryResponse; +import com.linkedin.venice.meta.ReadOnlySchemaRepository; +import com.linkedin.venice.schema.SchemaEntry; import com.linkedin.venice.serializer.AvroSerializer; import com.linkedin.venice.service.ICProvider; import com.linkedin.venice.utils.DaemonThreadFactory; import com.linkedin.venice.utils.PropertyBuilder; import com.linkedin.venice.utils.ReferenceCounted; import com.linkedin.venice.utils.VeniceProperties; +import java.lang.reflect.Field; +import java.security.AccessController; +import java.security.PrivilegedAction; import java.util.Arrays; import java.util.HashSet; import java.util.List; @@ -33,6 +47,55 @@ public class AvroGenericDaVinciClientTest { + public AvroGenericDaVinciClient setUpClientWithRecordTransformer( + ClientConfig clientConfig, + DaVinciConfig daVinciConfig) throws IllegalAccessException, NoSuchFieldException { + + if (daVinciConfig == null) { + daVinciConfig = new DaVinciConfig(); + } + + DaVinciRecordTransformerConfig recordTransformerConfig = new DaVinciRecordTransformerConfig( + (storeVersion) -> new TestStringRecordTransformer(storeVersion, true), + String.class, + Schema.create(Schema.Type.STRING)); + daVinciConfig.setRecordTransformerConfig(recordTransformerConfig); + + VeniceProperties backendConfig = mock(VeniceProperties.class); + when(backendConfig.toProperties()).thenReturn(new java.util.Properties()); + + AvroGenericDaVinciClient dvcClient = + spy(new AvroGenericDaVinciClient<>(daVinciConfig, clientConfig, backendConfig, Optional.empty())); + doReturn(false).when(dvcClient).isReady(); + doNothing().when(dvcClient).initBackend(any(), any(), any(), any(), any(), any()); + + D2ServiceDiscoveryResponse mockDiscoveryResponse = mock(D2ServiceDiscoveryResponse.class); + when(mockDiscoveryResponse.getCluster()).thenReturn("test_cluster"); + when(mockDiscoveryResponse.getZkAddress()).thenReturn("mock_zk_address"); + when(mockDiscoveryResponse.getKafkaBootstrapServers()).thenReturn("mock_kafka_bootstrap_servers"); + doReturn(mockDiscoveryResponse).when(dvcClient).discoverService(); + + DaVinciBackend mockBackend = mock(DaVinciBackend.class); + when(mockBackend.getSchemaRepository()).thenReturn(mock(ReadOnlySchemaRepository.class)); + when(mockBackend.getStoreOrThrow(anyString())).thenReturn(mock(StoreBackend.class)); + when(mockBackend.getObjectCache()).thenReturn(null); + + ReadOnlySchemaRepository mockSchemaRepository = mock(ReadOnlySchemaRepository.class); + Schema mockKeySchema = new Schema.Parser().parse("{\"type\": \"int\"}"); + when(mockSchemaRepository.getKeySchema(anyString())).thenReturn(new SchemaEntry(1, mockKeySchema)); + when(mockBackend.getSchemaRepository()).thenReturn(mockSchemaRepository); + + // Use reflection to set the private static daVinciBackend field + Field backendField = AvroGenericDaVinciClient.class.getDeclaredField("daVinciBackend"); + AccessController.doPrivileged((PrivilegedAction) () -> { + backendField.setAccessible(true); + return null; + }); + backendField.set(null, new ReferenceCounted<>(mockBackend, ignored -> {})); + + return dvcClient; + } + @Test public void testPropertyBuilderWithRecordTransformer() { String schema = "{\n" + " \"type\": \"string\"\n" + "}\n"; @@ -40,7 +103,37 @@ public void testPropertyBuilderWithRecordTransformer() { new PropertyBuilder().put("kafka.admin.class", "name").put("record.transformer.value.schema", schema).build(); RocksDBServerConfig dbconfig = new RocksDBServerConfig(config); Assert.assertEquals(schema, dbconfig.getTransformerValueSchema()); + } + + @Test + public void testRecordTransformerClient() throws NoSuchFieldException, IllegalAccessException { + ClientConfig clientConfig = mock(ClientConfig.class); + when(clientConfig.getStoreName()).thenReturn("test_store"); + when(clientConfig.getSpecificValueClass()).thenReturn(String.class); + when(clientConfig.isSpecificClient()).thenReturn(true); + + AvroGenericDaVinciClient dvcClient = setUpClientWithRecordTransformer(clientConfig, null); + dvcClient.start(); + } + + @Test + public void testRecordTransformerClientValueClassMismatch() throws NoSuchFieldException, IllegalAccessException { + ClientConfig clientConfig = mock(ClientConfig.class); + when(clientConfig.getStoreName()).thenReturn("test_store"); + when(clientConfig.getSpecificValueClass()).thenReturn(Integer.class); + when(clientConfig.isSpecificClient()).thenReturn(true); + + AvroGenericDaVinciClient dvcClient = setUpClientWithRecordTransformer(clientConfig, null); + assertThrows(VeniceClientException.class, () -> dvcClient.start()); + } + + @Test + public void testRecordTransformerWithIngestionIsolation() { + ClientConfig clientConfig = mock(ClientConfig.class); + DaVinciConfig daVinciConfig = new DaVinciConfig(); + daVinciConfig.setIsolated(true); + assertThrows(VeniceClientException.class, () -> setUpClientWithRecordTransformer(clientConfig, daVinciConfig)); } @Test diff --git a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/config/DaVinciConfigTest.java b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/config/DaVinciConfigTest.java index 0ece16ce56..70e7a9c93b 100644 --- a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/config/DaVinciConfigTest.java +++ b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/config/DaVinciConfigTest.java @@ -7,6 +7,8 @@ import com.linkedin.davinci.client.DaVinciConfig; import com.linkedin.davinci.client.DaVinciRecordTransformer; +import com.linkedin.davinci.client.DaVinciRecordTransformerConfig; +import com.linkedin.davinci.client.DaVinciRecordTransformerResult; import com.linkedin.venice.utils.lazy.Lazy; import org.apache.avro.Schema; import org.testng.annotations.Test; @@ -14,20 +16,28 @@ public class DaVinciConfigTest { public class TestRecordTransformer extends DaVinciRecordTransformer { - public TestRecordTransformer(int storeVersion) { - super(storeVersion); + public TestRecordTransformer(int storeVersion, boolean storeRecordsInDaVinci) { + super(storeVersion, storeRecordsInDaVinci); } - public Schema getKeyOutputSchema() { + @Override + public Schema getKeySchema() { return Schema.create(Schema.Type.INT); } - public Schema getValueOutputSchema() { + @Override + public Schema getOutputValueSchema() { return Schema.create(Schema.Type.INT); } - public Integer put(Lazy key, Lazy value) { - return value.get() + 1; + @Override + public DaVinciRecordTransformerResult transform(Lazy key, Lazy value) { + return new DaVinciRecordTransformerResult<>(DaVinciRecordTransformerResult.Result.TRANSFORMED, value.get() + 1); + } + + @Override + public void processPut(Lazy key, Lazy value) { + return; } } @@ -35,16 +45,24 @@ public Integer put(Lazy key, Lazy value) { public void testRecordTransformerEnabled() { DaVinciConfig config = new DaVinciConfig(); assertFalse(config.isRecordTransformerEnabled()); - config.setRecordTransformerFunction((storeVersion) -> new TestRecordTransformer(storeVersion)); + DaVinciRecordTransformerConfig recordTransformerConfig = new DaVinciRecordTransformerConfig( + (storeVersion) -> new TestRecordTransformer(storeVersion, true), + Integer.class, + Schema.create(Schema.Type.INT)); + config.setRecordTransformerConfig(recordTransformerConfig); assertTrue(config.isRecordTransformerEnabled()); } @Test public void testGetAndSetRecordTransformer() { - Integer testStoreVersion = 0; + Integer testStoreVersion = 1; DaVinciConfig config = new DaVinciConfig(); assertNull(config.getRecordTransformer(testStoreVersion)); - config.setRecordTransformerFunction((storeVersion) -> new TestRecordTransformer(storeVersion)); + DaVinciRecordTransformerConfig recordTransformerConfig = new DaVinciRecordTransformerConfig( + (storeVersion) -> new TestRecordTransformer(storeVersion, true), + Integer.class, + Schema.create(Schema.Type.INT)); + config.setRecordTransformerConfig(recordTransformerConfig); assertNotNull(config.getRecordTransformer(testStoreVersion)); } diff --git a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTaskTest.java b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTaskTest.java index ba8a2f0d1f..43716ce1ec 100644 --- a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTaskTest.java +++ b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTaskTest.java @@ -78,7 +78,7 @@ import static org.testng.Assert.assertTrue; import static org.testng.Assert.fail; -import com.linkedin.davinci.client.DaVinciRecordTransformer; +import com.linkedin.davinci.client.DaVinciRecordTransformerFunctionalInterface; import com.linkedin.davinci.compression.StorageEngineBackedCompressorFactory; import com.linkedin.davinci.config.VeniceServerConfig; import com.linkedin.davinci.config.VeniceStoreVersionConfig; @@ -97,11 +97,11 @@ import com.linkedin.davinci.storage.StorageEngineRepository; import com.linkedin.davinci.storage.StorageMetadataService; import com.linkedin.davinci.store.AbstractStorageEngine; +import com.linkedin.davinci.store.AbstractStorageIterator; import com.linkedin.davinci.store.AbstractStoragePartition; import com.linkedin.davinci.store.StoragePartitionConfig; import com.linkedin.davinci.store.record.ValueRecord; import com.linkedin.davinci.store.rocksdb.RocksDBServerConfig; -import com.linkedin.davinci.transformer.TestAvroRecordTransformer; import com.linkedin.davinci.transformer.TestStringRecordTransformer; import com.linkedin.venice.compression.CompressionStrategy; import com.linkedin.venice.exceptions.MemoryLimitExhaustedException; @@ -634,8 +634,8 @@ private void runTest( Set partitions, Runnable assertions, AAConfig aaConfig, - Function getRecordTransformer) throws Exception { - runTest(partitions, () -> {}, assertions, aaConfig, getRecordTransformer); + DaVinciRecordTransformerFunctionalInterface recordTransformerFunction) throws Exception { + runTest(partitions, () -> {}, assertions, aaConfig, recordTransformerFunction); } private void runTest( @@ -643,7 +643,7 @@ private void runTest( Runnable beforeStartingConsumption, Runnable assertions, AAConfig aaConfig, - Function getRecordTransformer) throws Exception { + DaVinciRecordTransformerFunctionalInterface recordTransformerFunction) throws Exception { runTest( new RandomPollStrategy(), partitions, @@ -655,7 +655,7 @@ private void runTest( aaConfig, Collections.emptyMap(), storeVersionConfigOverride -> {}, - getRecordTransformer); + recordTransformerFunction); } private void runTest( @@ -703,7 +703,7 @@ private void runTest( Runnable beforeStartingConsumption, Runnable assertions, AAConfig aaConfig, - Function getRecordTransformer) throws Exception { + DaVinciRecordTransformerFunctionalInterface recordTransformerFunction) throws Exception { runTest( pollStrategy, partitions, @@ -715,7 +715,7 @@ private void runTest( aaConfig, Collections.emptyMap(), storeVersionConfigOverride -> {}, - getRecordTransformer); + recordTransformerFunction); } private void runTest( @@ -753,7 +753,7 @@ private void runTest( AAConfig aaConfig, Map extraServerProperties, Consumer storeVersionConfigOverride, - Function getRecordTransformer) throws Exception { + DaVinciRecordTransformerFunctionalInterface recordTransformerFunction) throws Exception { runTest( pollStrategy, partitions, @@ -767,7 +767,7 @@ private void runTest( aaConfig, extraServerProperties, storeVersionConfigOverride, - getRecordTransformer); + recordTransformerFunction); } /** @@ -800,7 +800,7 @@ private void runTest( AAConfig aaConfig, Map extraServerProperties, Consumer storeVersionConfigOverride, - Function getRecordTransformer) throws Exception { + DaVinciRecordTransformerFunctionalInterface recordTransformerFunction) throws Exception { int partitionCount = PARTITION_COUNT; VenicePartitioner partitioner = getVenicePartitioner(); // Only get base venice partitioner @@ -821,9 +821,13 @@ private void runTest( Version version = storeAndVersionConfigsUnderTest.version; VeniceStoreVersionConfig storeConfig = storeAndVersionConfigsUnderTest.storeVersionConfig; - StoreIngestionTaskFactory ingestionTaskFactory = - getIngestionTaskFactoryBuilder(pollStrategy, partitions, diskUsageForTest, extraServerProperties, false) - .build(); + StoreIngestionTaskFactory ingestionTaskFactory = getIngestionTaskFactoryBuilder( + pollStrategy, + partitions, + diskUsageForTest, + extraServerProperties, + false, + recordTransformerFunction).build(); Properties kafkaProps = new Properties(); kafkaProps.put(KAFKA_BOOTSTRAP_SERVERS, inMemoryLocalKafkaBroker.getKafkaBootstrapServer()); @@ -838,7 +842,7 @@ private void runTest( PARTITION_FOO, false, Optional.empty(), - getRecordTransformer)); + recordTransformerFunction)); Future testSubscribeTaskFuture = null; try { @@ -955,9 +959,22 @@ private StoreIngestionTaskFactory.Builder getIngestionTaskFactoryBuilder( Set partitions, Optional diskUsageForTest, Map extraServerProperties, - Boolean isLiveConfigEnabled) { - doReturn(new DeepCopyStorageEngine(mockAbstractStorageEngine)).when(mockStorageEngineRepository) - .getLocalStorageEngine(topic); + Boolean isLiveConfigEnabled, + DaVinciRecordTransformerFunctionalInterface recordTransformerFunction) { + + if (recordTransformerFunction != null) { + doReturn(mockAbstractStorageEngine).when(mockStorageEngineRepository).getLocalStorageEngine(topic); + + AbstractStorageIterator iterator = mock(AbstractStorageIterator.class); + when(iterator.isValid()).thenReturn(true).thenReturn(false); + when(iterator.key()).thenReturn("mockKey".getBytes()); + when(iterator.value()).thenReturn("mockValue".getBytes()); + when(mockAbstractStorageEngine.getIterator(anyInt())).thenReturn(iterator); + + } else { + doReturn(new DeepCopyStorageEngine(mockAbstractStorageEngine)).when(mockStorageEngineRepository) + .getLocalStorageEngine(topic); + } inMemoryLocalKafkaConsumer = new MockInMemoryConsumer(inMemoryLocalKafkaBroker, pollStrategy, mockLocalKafkaConsumer); @@ -2753,7 +2770,8 @@ public void testRecordsCanBeThrottledPerRegion() throws ExecutionException, Inte Utils.setOf(PARTITION_FOO), Optional.empty(), extraServerProperties, - true).build(); + true, + null).build(); Properties kafkaProps = new Properties(); kafkaProps.put(KAFKA_BOOTSTRAP_SERVERS, inMemoryLocalKafkaBroker.getKafkaBootstrapServer()); @@ -2890,7 +2908,8 @@ public void testIsReadyToServe(NodeType nodeType, AAConfig aaConfig, DataReplica Utils.setOf(PARTITION_FOO), Optional.empty(), extraServerProperties, - false).setIsDaVinciClient(nodeType == DA_VINCI).setAggKafkaConsumerService(aggKafkaConsumerService).build(); + false, + null).setIsDaVinciClient(nodeType == DA_VINCI).setAggKafkaConsumerService(aggKafkaConsumerService).build(); TopicManager mockTopicManagerRemoteKafka = mock(TopicManager.class); @@ -3108,7 +3127,8 @@ public void testActiveActiveStoreIsReadyToServe(HybridConfig hybridConfig, NodeT Utils.setOf(PARTITION_FOO), Optional.empty(), new HashMap<>(), - false).setIsDaVinciClient(nodeType == DA_VINCI).setAggKafkaConsumerService(aggKafkaConsumerService).build(); + false, + null).setIsDaVinciClient(nodeType == DA_VINCI).setAggKafkaConsumerService(aggKafkaConsumerService).build(); TopicManager mockTopicManagerRemoteKafka = mock(TopicManager.class); doReturn(mockTopicManager).when(mockTopicManagerRepository) @@ -3244,7 +3264,8 @@ public void testCheckAndLogIfLagIsAcceptableForHybridStore( Utils.setOf(PARTITION_FOO), Optional.empty(), serverProperties, - false).setIsDaVinciClient(nodeType == DA_VINCI).setAggKafkaConsumerService(aggKafkaConsumerService).build(); + false, + null).setIsDaVinciClient(nodeType == DA_VINCI).setAggKafkaConsumerService(aggKafkaConsumerService).build(); TopicManager mockTopicManagerRemoteKafka = mock(TopicManager.class); doReturn(mockTopicManager).when(mockTopicManagerRepository) @@ -3402,7 +3423,8 @@ public void testGetAndUpdateLeaderCompletedState(HybridConfig hybridConfig, Node Utils.setOf(PARTITION_FOO), Optional.empty(), new HashMap<>(), - false).setIsDaVinciClient(nodeType == DA_VINCI).setAggKafkaConsumerService(aggKafkaConsumerService).build(); + false, + null).setIsDaVinciClient(nodeType == DA_VINCI).setAggKafkaConsumerService(aggKafkaConsumerService).build(); Properties kafkaProps = new Properties(); kafkaProps.put(KAFKA_BOOTSTRAP_SERVERS, inMemoryLocalKafkaBroker.getKafkaBootstrapServer()); @@ -3497,7 +3519,8 @@ public void testProcessTopicSwitch(NodeType nodeType) { Utils.setOf(PARTITION_FOO), Optional.empty(), new HashMap<>(), - false).setIsDaVinciClient(nodeType == DA_VINCI).build(); + false, + null).setIsDaVinciClient(nodeType == DA_VINCI).build(); Properties kafkaProps = new Properties(); kafkaProps.put(KAFKA_BOOTSTRAP_SERVERS, inMemoryLocalKafkaBroker.getKafkaBootstrapServer()); @@ -4194,7 +4217,8 @@ public void testBatchOnlyStoreDataRecovery() { Utils.setOf(PARTITION_FOO), Optional.empty(), Collections.emptyMap(), - true).build(); + true, + null).build(); doReturn(Version.parseStoreFromVersionTopic(topic)).when(store).getName(); storeIngestionTaskUnderTest = ingestionTaskFactory.getNewIngestionTask( store, @@ -4504,12 +4528,14 @@ public void testStoreIngestionRecordTransformer(AAConfig aaConfig) throws Except } catch (InterruptedException e) { throw new VeniceException(e); } - }, aaConfig, (storeVersion) -> new TestAvroRecordTransformer(storeVersion)); + }, aaConfig, (storeVersion) -> new TestStringRecordTransformer(storeVersion, true)); + + // Transformer error should never be recorded + verify(mockVersionedStorageIngestionStats, never()) + .recordTransformerError(eq(storeNameWithoutVersionInfo), anyInt(), anyDouble(), anyLong()); } // Test to throw type error when performing record transformation with incompatible types - // @Test(dataProvider = "aaConfigProvider", expectedExceptions = { VeniceException.class, VeniceMessageException.class - // }) @Test(dataProvider = "aaConfigProvider") public void testStoreIngestionRecordTransformerError(AAConfig aaConfig) throws Exception { byte[] keyBytes = new byte[1]; @@ -4565,7 +4591,7 @@ public void testStoreIngestionRecordTransformerError(AAConfig aaConfig) throws E // Verify transformer error was recorded verify(mockVersionedStorageIngestionStats, timeout(1000)) .recordTransformerError(eq(storeNameWithoutVersionInfo), anyInt(), anyDouble(), anyLong()); - }, aaConfig, TestStringRecordTransformer::new); + }, aaConfig, (storeVersion) -> new TestStringRecordTransformer(storeVersion, true)); } public enum RmdState { diff --git a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/transformer/DaVinciRecordTransformerResultTest.java b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/transformer/DaVinciRecordTransformerResultTest.java new file mode 100644 index 0000000000..75e00669c8 --- /dev/null +++ b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/transformer/DaVinciRecordTransformerResultTest.java @@ -0,0 +1,43 @@ +package com.linkedin.davinci.transformer; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertThrows; + +import com.linkedin.davinci.client.DaVinciRecordTransformerResult; +import com.linkedin.venice.exceptions.VeniceException; +import org.testng.annotations.Test; + + +public class DaVinciRecordTransformerResultTest { + public final static String testString = "testString"; + + @Test + public void testPassingInTransformedToSingleArgumentConstructor() { + assertThrows( + VeniceException.class, + () -> new DaVinciRecordTransformerResult<>(DaVinciRecordTransformerResult.Result.TRANSFORMED)); + } + + @Test + public void testPassingInSkipToTwoArgumentConstructor() { + assertThrows( + VeniceException.class, + () -> new DaVinciRecordTransformerResult<>(DaVinciRecordTransformerResult.Result.SKIP, testString)); + } + + @Test + public void testPassingInUnchangedToTwoArgumentConstructor() { + assertThrows( + VeniceException.class, + () -> new DaVinciRecordTransformerResult<>(DaVinciRecordTransformerResult.Result.UNCHANGED, testString)); + } + + @Test + public void testRetrieveResult() { + DaVinciRecordTransformerResult transformerResult = + new DaVinciRecordTransformerResult<>(DaVinciRecordTransformerResult.Result.TRANSFORMED, testString); + + assertEquals(transformerResult.getResult(), DaVinciRecordTransformerResult.Result.TRANSFORMED); + assertEquals(transformerResult.getValue(), testString); + } +} diff --git a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/transformer/RecordTransformerTest.java b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/transformer/RecordTransformerTest.java index 53ad0a41bd..1cb8ef654a 100644 --- a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/transformer/RecordTransformerTest.java +++ b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/transformer/RecordTransformerTest.java @@ -1,34 +1,126 @@ package com.linkedin.davinci.transformer; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.reset; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertNull; +import static org.testng.Assert.assertTrue; +import com.linkedin.davinci.client.BlockingDaVinciRecordTransformer; import com.linkedin.davinci.client.DaVinciRecordTransformer; +import com.linkedin.davinci.client.DaVinciRecordTransformerResult; +import com.linkedin.davinci.client.DaVinciRecordTransformerUtility; +import com.linkedin.davinci.store.AbstractStorageEngine; +import com.linkedin.davinci.store.AbstractStorageIterator; +import com.linkedin.venice.compression.VeniceCompressor; import com.linkedin.venice.utils.lazy.Lazy; +import java.io.File; import org.apache.avro.Schema; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; public class RecordTransformerTest { + static final int storeVersion = 1; + + @BeforeMethod + @AfterClass + public void deleteClassHash() { + File file = new File(String.format("./classHash-%d.txt", storeVersion)); + if (file.exists()) { + assertTrue(file.delete()); + } + } + @Test public void testRecordTransformer() { - DaVinciRecordTransformer recordTransformer = new TestStringRecordTransformer(0); + DaVinciRecordTransformer recordTransformer = + new TestStringRecordTransformer(storeVersion, false); + assertEquals(recordTransformer.getStoreVersion(), storeVersion); + + Schema keySchema = recordTransformer.getKeySchema(); + assertEquals(keySchema.getType(), Schema.Type.INT); + + Schema outputValueSchema = recordTransformer.getOutputValueSchema(); + assertEquals(outputValueSchema.getType(), Schema.Type.STRING); + + Lazy lazyKey = Lazy.of(() -> 42); + Lazy lazyValue = Lazy.of(() -> "SampleValue"); + DaVinciRecordTransformerResult transformerResult = recordTransformer.transform(lazyKey, lazyValue); + recordTransformer.processPut(lazyKey, lazyValue); + assertEquals(transformerResult.getResult(), DaVinciRecordTransformerResult.Result.TRANSFORMED); + assertEquals(transformerResult.getValue(), "SampleValueTransformed"); + assertNull(recordTransformer.transformAndProcessPut(lazyKey, lazyValue)); + + recordTransformer.processDelete(lazyKey); + + assertFalse(recordTransformer.getStoreRecordsInDaVinci()); + + int classHash = recordTransformer.getClassHash(); + + DaVinciRecordTransformerUtility recordTransformerUtility = + recordTransformer.getRecordTransformerUtility(); + assertTrue(recordTransformerUtility.hasTransformerLogicChanged(classHash)); + assertFalse(recordTransformerUtility.hasTransformerLogicChanged(classHash)); + } + + @Test + public void testOnRecovery() { + DaVinciRecordTransformer recordTransformer = + new TestStringRecordTransformer(storeVersion, true); + + AbstractStorageIterator iterator = mock(AbstractStorageIterator.class); + when(iterator.isValid()).thenReturn(true).thenReturn(false); + when(iterator.key()).thenReturn("mockKey".getBytes()); + when(iterator.value()).thenReturn("mockValue".getBytes()); + + AbstractStorageEngine storageEngine = mock(AbstractStorageEngine.class); + Lazy compressor = Lazy.of(() -> mock(VeniceCompressor.class)); + + int partitionNumber = 1; + recordTransformer.onRecovery(storageEngine, partitionNumber, compressor); + verify(storageEngine, times(1)).clearPartitionOffset(partitionNumber); - assertEquals(recordTransformer.getStoreVersion(), 0); + // Reset the mock to clear previous interactions + reset(storageEngine); - Schema keyOutputSchema = recordTransformer.getKeyOutputSchema(); - assertEquals(keyOutputSchema.getType(), Schema.Type.INT); + // Execute the onRecovery method again to test the case where the classHash file exists + when(storageEngine.getIterator(partitionNumber)).thenReturn(iterator); + recordTransformer.onRecovery(storageEngine, partitionNumber, compressor); + verify(storageEngine, never()).clearPartitionOffset(partitionNumber); + verify(storageEngine, times(1)).getIterator(partitionNumber); + } + + @Test + public void testBlockingRecordTransformer() { + DaVinciRecordTransformer recordTransformer = new TestStringRecordTransformer(0, true); + recordTransformer = + new BlockingDaVinciRecordTransformer<>(recordTransformer, recordTransformer.getStoreRecordsInDaVinci()); + recordTransformer.onStartVersionIngestion(); - Schema valueOutputSchema = recordTransformer.getValueOutputSchema(); - assertEquals(valueOutputSchema.getType(), Schema.Type.STRING); + assertTrue(recordTransformer.getStoreRecordsInDaVinci()); + + Schema keySchema = recordTransformer.getKeySchema(); + assertEquals(keySchema.getType(), Schema.Type.INT); + + Schema outputValueSchema = recordTransformer.getOutputValueSchema(); + assertEquals(outputValueSchema.getType(), Schema.Type.STRING); Lazy lazyKey = Lazy.of(() -> 42); Lazy lazyValue = Lazy.of(() -> "SampleValue"); - String transformedRecord = recordTransformer.put(lazyKey, lazyValue); - assertEquals(transformedRecord, "SampleValueTransformed"); + DaVinciRecordTransformerResult recordTransformerResult = + recordTransformer.transformAndProcessPut(lazyKey, lazyValue); + assertEquals(recordTransformerResult.getValue(), "SampleValueTransformed"); + + recordTransformer.processDelete(lazyKey); - String deletedRecord = recordTransformer.delete(lazyKey); - assertNull(deletedRecord); + recordTransformer.onEndVersionIngestion(); } } diff --git a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/transformer/TestAvroRecordTransformer.java b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/transformer/TestAvroRecordTransformer.java deleted file mode 100644 index 132eb105af..0000000000 --- a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/transformer/TestAvroRecordTransformer.java +++ /dev/null @@ -1,24 +0,0 @@ -package com.linkedin.davinci.transformer; - -import com.linkedin.davinci.client.DaVinciRecordTransformer; -import com.linkedin.venice.utils.lazy.Lazy; -import org.apache.avro.Schema; - - -public class TestAvroRecordTransformer extends DaVinciRecordTransformer { - public TestAvroRecordTransformer(int storeVersion) { - super(storeVersion); - } - - public Schema getKeyOutputSchema() { - return Schema.create(Schema.Type.INT); - } - - public Schema getValueOutputSchema() { - return Schema.create(Schema.Type.STRING); - } - - public Object put(Lazy key, Lazy value) { - return value.get() + "Transformed"; - } -} diff --git a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/transformer/TestStringRecordTransformer.java b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/transformer/TestStringRecordTransformer.java index ef71385157..f9d7a3020b 100644 --- a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/transformer/TestStringRecordTransformer.java +++ b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/transformer/TestStringRecordTransformer.java @@ -1,24 +1,45 @@ package com.linkedin.davinci.transformer; import com.linkedin.davinci.client.DaVinciRecordTransformer; +import com.linkedin.davinci.client.DaVinciRecordTransformerResult; import com.linkedin.venice.utils.lazy.Lazy; import org.apache.avro.Schema; +import org.apache.avro.util.Utf8; public class TestStringRecordTransformer extends DaVinciRecordTransformer { - public TestStringRecordTransformer(int storeVersion) { - super(storeVersion); + public TestStringRecordTransformer(int storeVersion, boolean storeRecordsInDaVinci) { + super(storeVersion, storeRecordsInDaVinci); } - public Schema getKeyOutputSchema() { + @Override + public Schema getKeySchema() { return Schema.create(Schema.Type.INT); } - public Schema getValueOutputSchema() { + @Override + public Schema getOutputValueSchema() { return Schema.create(Schema.Type.STRING); } - public String put(Lazy key, Lazy value) { - return value.get() + "Transformed"; + @Override + public DaVinciRecordTransformerResult transform(Lazy key, Lazy value) { + Object valueObj = value.get(); + String valueStr; + + if (valueObj instanceof Utf8) { + valueStr = valueObj.toString(); + } else { + valueStr = (String) valueObj; + } + + String transformedValue = valueStr + "Transformed"; + + return new DaVinciRecordTransformerResult<>(DaVinciRecordTransformerResult.Result.TRANSFORMED, transformedValue); + } + + @Override + public void processPut(Lazy key, Lazy value) { + return; } } diff --git a/docs/dev_guide/how_to/workspace_setup.md b/docs/dev_guide/how_to/workspace_setup.md index 318b471365..c80c6528d6 100644 --- a/docs/dev_guide/how_to/workspace_setup.md +++ b/docs/dev_guide/how_to/workspace_setup.md @@ -11,7 +11,7 @@ We recommend using a Unix-based environment for development, such as Linux or ma If you're on Windows, we recommend using [WSL2](https://learn.microsoft.com/en-us/windows/wsl/install). ## Fork the Venice Repository -Fork the Venice repo at https://github.com/linkedin/venice +Fork the Venice repo at [https://github.com/linkedin/venice](https://github.com/linkedin/venice). ## Setting up the repository locally ```shell diff --git a/docs/user_guide/read_api/da_vinci_client.md b/docs/user_guide/read_api/da_vinci_client.md index cfc2adee04..9d412810f1 100644 --- a/docs/user_guide/read_api/da_vinci_client.md +++ b/docs/user_guide/read_api/da_vinci_client.md @@ -11,14 +11,17 @@ This allows you to eagerly load some or all partitions of the dataset and perfor cache. Future updates to the data continue to be streamed in and applied to the local cache. ## Record Transformer -This feature enables applications to transform records as they're being consumed and stored in the Da Vinci Client. +This feature enables applications to transform records before they're stored in the Da Vinci Client +or a custom storage of your choice. It's capable of handling records that are compressed and/or chunked. ### Usage -To use the record transformer, you will need to implement the +Steps to use the record transformer: +1. Implement the [DaVinciRecordTransformer](http://venicedb.org/javadoc/com/linkedin/davinci/client/DaVinciRecordTransformer.html) -abstract class, then pass in a functional interface into -[setRecordTransformerFunction()](https://venicedb.org/javadoc/com/linkedin/davinci/client/DaVinciConfig.html#setRecordTransformerFunction(com.linkedin.davinci.client.DaVinciRecordTransformer)). +abstract class. +2. Create an instance of [DaVinciRecordTransformerConfig](http://venicedb.org/javadoc/com/linkedin/davinci/client/DaVinciRecordTransformerConfig.html). +3. Pass the instance of the config into [setRecordTransformerConfig()](https://venicedb.org/javadoc/com/linkedin/davinci/client/DaVinciConfig.html#setRecordTransformerConfig(com.linkedin.davinci.client.DaVinciRecordTransformerConfig)). When a message is being consumed, the [DaVinciRecordTransformer](http://venicedb.org/javadoc/com/linkedin/davinci/client/DaVinciRecordTransformer.html) will @@ -31,30 +34,58 @@ package com.linkedin.davinci.transformer; import com.linkedin.davinci.client.DaVinciRecordTransformer; import com.linkedin.venice.utils.lazy.Lazy; import org.apache.avro.Schema; +import org.apache.avro.util.Utf8; public class StringRecordTransformer extends DaVinciRecordTransformer { - public StringRecordTransformer(int storeVersion) { - super(storeVersion); + public TestStringRecordTransformer(int storeVersion, boolean storeRecordsInDaVinci) { + super(storeVersion, storeRecordsInDaVinci); } - public Schema getKeyOutputSchema() { + @Override + public Schema getKeySchema() { return Schema.create(Schema.Type.INT); } - public Schema getValueOutputSchema() { + @Override + public Schema getOutputValueSchema() { return Schema.create(Schema.Type.STRING); } - public String put(Lazy key, Lazy value) { - return value.get() + "Transformed"; + @Override + public DaVinciRecordTransformerResult transform(Lazy key, Lazy value) { + Object valueObj = value.get(); + String valueStr; + + if (valueObj instanceof Utf8) { + valueStr = valueObj.toString(); + } else { + valueStr = (String) valueObj; + } + + String transformedValue = valueStr + "Transformed"; + + /** + * If you want to skip a specific record or don't want to modify the value, + * use the single argument constructor for DaVinciRecordTransformerResult and pass in + * DaVinciRecordTransformerResult.Result.SKIP or DaVinciRecordTransformerResult.Result.UNCHANGED + */ + return new DaVinciRecordTransformerResult<>(DaVinciRecordTransformerResult.Result.TRANSFORMED, transformedValue); + } + + @Override + public void processPut(Lazy key, Lazy value) { + return; } } ``` -Here's an example `setRecordTransformerFunction()` implementation: +Here's an example `DaVinciRecordTransformerConfig` implementation: ``` DaVinciConfig config = new DaVinciConfig(); -config.setRecordTransformerFunction((storeVersion) -> new StringRecordTransformer(storeVersion)); +DaVinciRecordTransformerConfig recordTransformerConfig = new DaVinciRecordTransformerConfig( + (storeVersion) -> new StringRecordTransformer(storeVersion, true), + String.class, Schema.create(Schema.Type.STRING)); +config.setRecordTransformerFunction((storeVersion) -> new StringRecordTransformer(storeVersion, true)); ``` \ No newline at end of file diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/serialization/avro/AvroSpecificStoreDeserializerCache.java b/internal/venice-common/src/main/java/com/linkedin/venice/serialization/avro/AvroSpecificStoreDeserializerCache.java index 85e5540d80..15bebdd6d8 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/serialization/avro/AvroSpecificStoreDeserializerCache.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/serialization/avro/AvroSpecificStoreDeserializerCache.java @@ -27,6 +27,10 @@ public AvroSpecificStoreDeserializerCache( this(id -> schemaRepository.getValueSchema(storeName, id).getSchema(), valueClass); } + public AvroSpecificStoreDeserializerCache(Schema valueSchema, Class valueClass) { + this(id -> valueSchema, valueClass); + } + private AvroSpecificStoreDeserializerCache(IntFunction schemaGetter, Class valueClass) { this.schemaGetter = schemaGetter; this.cache = new SparseConcurrentList<>(); diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/DaVinciClientTest.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/DaVinciClientTest.java index 6e30c0f70f..e2e9a50e7b 100644 --- a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/DaVinciClientTest.java +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/DaVinciClientTest.java @@ -29,6 +29,7 @@ import static com.linkedin.venice.utils.IntegrationTestPushUtils.createStoreForJob; import static com.linkedin.venice.utils.IntegrationTestPushUtils.defaultVPJProps; import static com.linkedin.venice.utils.TestWriteUtils.getTempDataDirectory; +import static com.linkedin.venice.utils.TestWriteUtils.writeSimpleAvroFileWithIntToIntSchema; import static com.linkedin.venice.utils.TestWriteUtils.writeSimpleAvroFileWithIntToStringSchema; import static com.linkedin.venice.vpj.VenicePushJobConstants.VENICE_STORE_NAME_PROP; import static org.mockito.Mockito.mock; @@ -49,6 +50,7 @@ import com.linkedin.davinci.client.AvroGenericDaVinciClient; import com.linkedin.davinci.client.DaVinciClient; import com.linkedin.davinci.client.DaVinciConfig; +import com.linkedin.davinci.client.DaVinciRecordTransformerConfig; import com.linkedin.davinci.client.NonLocalAccessException; import com.linkedin.davinci.client.StorageClass; import com.linkedin.davinci.client.factory.CachingDaVinciClientFactory; @@ -97,6 +99,7 @@ import io.tehuti.Metric; import io.tehuti.metrics.MetricsRepository; import java.io.File; +import java.io.IOException; import java.nio.ByteBuffer; import java.nio.file.Files; import java.nio.file.Paths; @@ -116,7 +119,6 @@ import java.util.concurrent.TimeUnit; import java.util.function.BiFunction; import java.util.function.Consumer; -import java.util.function.Function; import java.util.stream.Stream; import org.apache.avro.Schema; import org.apache.avro.generic.GenericData; @@ -239,50 +241,374 @@ public void testConcurrentGetAndStart() throws Exception { } @Test(timeOut = TEST_TIMEOUT, dataProvider = "dv-client-config-provider", dataProviderClass = DataProviderUtils.class) - public void testBatchStore(DaVinciConfig clientConfig) throws Exception { - String storeName1 = - createStoreWithMetaSystemStoreAndPushStatusSystemStore(KEY_COUNT, CompressionStrategy.GZIP, s -> null); - String storeName2 = createStoreWithMetaSystemStoreAndPushStatusSystemStore(KEY_COUNT); - String storeName3 = createStoreWithMetaSystemStoreAndPushStatusSystemStore(KEY_COUNT); - String baseDataPath = Utils.getTempDataDirectory().getAbsolutePath(); - VeniceProperties backendConfig = new PropertyBuilder().put(CLIENT_USE_SYSTEM_STORE_REPOSITORY, true) - .put(CLIENT_SYSTEM_STORE_REPOSITORY_REFRESH_INTERVAL_SECONDS, 1) - .put(DATA_BASE_PATH, baseDataPath) - .put(PERSISTENCE_TYPE, ROCKS_DB) - .put(DA_VINCI_CURRENT_VERSION_BOOTSTRAPPING_SPEEDUP_ENABLED, true) - .put(PUSH_STATUS_STORE_ENABLED, true) - .put(DAVINCI_PUSH_STATUS_CHECK_INTERVAL_IN_MS, 1000) - .build(); + public void testRecordTransformer(DaVinciConfig clientConfig) throws Exception { + String storeName = Utils.getUniqueString("test-store"); + boolean pushStatusStoreEnabled = false; + boolean chunkingEnabled = false; + CompressionStrategy compressionStrategy = CompressionStrategy.NO_OP; + String customValue = "a"; + int numKeys = 10; + + setUpStore(storeName, pushStatusStoreEnabled, chunkingEnabled, compressionStrategy, customValue, numKeys); + VeniceProperties backendConfig = buildRecordTransformerBackendConfig(pushStatusStoreEnabled); MetricsRepository metricsRepository = new MetricsRepository(); - // Test record transformation try (CachingDaVinciClientFactory factory = new CachingDaVinciClientFactory( d2Client, VeniceRouterWrapper.CLUSTER_DISCOVERY_D2_SERVICE_NAME, metricsRepository, backendConfig)) { + DaVinciRecordTransformerConfig recordTransformerConfig = new DaVinciRecordTransformerConfig( + (storeVersion) -> new TestStringRecordTransformer(storeVersion, true), + String.class, + Schema.create(Schema.Type.STRING)); + clientConfig.setRecordTransformerConfig(recordTransformerConfig); + DaVinciClient clientWithRecordTransformer = - factory.getAndStartGenericAvroClient(storeName1, clientConfig.setRecordTransformerFunction((storeVersion) -> { - TestRecordTransformer recordTransformer = new TestRecordTransformer(storeVersion); - recordTransformer.setOriginalSchema(Schema.parse(DEFAULT_VALUE_SCHEMA)); - return recordTransformer; - })); + factory.getAndStartGenericAvroClient(storeName, clientConfig); // Test non-existent key access clientWithRecordTransformer.subscribeAll().get(); - assertNull(clientWithRecordTransformer.get(KEY_COUNT + 1).get()); + assertNull(clientWithRecordTransformer.get(numKeys + 1).get()); // Test single-get access - for (int k = 0; k < KEY_COUNT; ++k) { - assertEquals(clientWithRecordTransformer.get(k).get(), 100); + for (int k = 1; k <= numKeys; ++k) { + Object valueObj = clientWithRecordTransformer.get(k).get(); + String expectedValue = "a" + k + "Transformed"; + assertEquals(valueObj.toString(), expectedValue); + } + clientWithRecordTransformer.unsubscribeAll(); + } + } + + @Test(timeOut = TEST_TIMEOUT, dataProvider = "dv-client-config-provider", dataProviderClass = DataProviderUtils.class) + public void testTypeChangeRecordTransformer(DaVinciConfig clientConfig) throws Exception { + String storeName = Utils.getUniqueString("test-store"); + boolean pushStatusStoreEnabled = false; + boolean chunkingEnabled = false; + CompressionStrategy compressionStrategy = CompressionStrategy.NO_OP; + int numKeys = 10; + + setUpStore(storeName, pushStatusStoreEnabled, chunkingEnabled, compressionStrategy, numKeys); + + VeniceProperties backendConfig = buildRecordTransformerBackendConfig(pushStatusStoreEnabled); + MetricsRepository metricsRepository = new MetricsRepository(); + + try (CachingDaVinciClientFactory factory = new CachingDaVinciClientFactory( + d2Client, + VeniceRouterWrapper.CLUSTER_DISCOVERY_D2_SERVICE_NAME, + metricsRepository, + backendConfig)) { + DaVinciRecordTransformerConfig recordTransformerConfig = new DaVinciRecordTransformerConfig( + (storeVersion) -> new TestIntToStringRecordTransformer(storeVersion, true), + String.class, + Schema.create(Schema.Type.STRING)); + clientConfig.setRecordTransformerConfig(recordTransformerConfig); + + DaVinciClient clientWithRecordTransformer = + factory.getAndStartGenericAvroClient(storeName, clientConfig, String.class); + + // Test non-existent key access + clientWithRecordTransformer.subscribeAll().get(); + assertNull(clientWithRecordTransformer.get(numKeys + 1).get()); + + // Test single-get access + for (int k = 1; k <= numKeys; ++k) { + Object valueObj = clientWithRecordTransformer.get(k).get(); + String expectedValue = k + "Transformed"; + assertEquals(valueObj.toString(), expectedValue); + } + clientWithRecordTransformer.unsubscribeAll(); + } + } + + @Test(timeOut = TEST_TIMEOUT, dataProvider = "dv-client-config-provider", dataProviderClass = DataProviderUtils.class) + public void testRecordTransformerOnRecovery(DaVinciConfig clientConfig) throws Exception { + String storeName = Utils.getUniqueString("test-store"); + boolean pushStatusStoreEnabled = true; + boolean chunkingEnabled = false; + CompressionStrategy compressionStrategy = CompressionStrategy.GZIP; + String customValue = "a"; + int numKeys = 10; + + setUpStore(storeName, pushStatusStoreEnabled, chunkingEnabled, compressionStrategy, customValue, numKeys); + + VeniceProperties backendConfig = buildRecordTransformerBackendConfig(pushStatusStoreEnabled); + MetricsRepository metricsRepository = new MetricsRepository(); + clientConfig.setStorageClass(StorageClass.DISK); + + try (CachingDaVinciClientFactory factory = new CachingDaVinciClientFactory( + d2Client, + VeniceRouterWrapper.CLUSTER_DISCOVERY_D2_SERVICE_NAME, + metricsRepository, + backendConfig)) { + + TestStringRecordTransformer recordTransformer = new TestStringRecordTransformer(1, true); + + DaVinciRecordTransformerConfig recordTransformerConfig = new DaVinciRecordTransformerConfig( + (storeVersion) -> recordTransformer, + String.class, + Schema.create(Schema.Type.STRING)); + clientConfig.setRecordTransformerConfig(recordTransformerConfig); + + DaVinciClient clientWithRecordTransformer = + factory.getAndStartGenericAvroClient(storeName, clientConfig); + + // Test non-existent key access + clientWithRecordTransformer.subscribeAll().get(); + assertNull(clientWithRecordTransformer.get(numKeys + 1).get()); + assertNull(recordTransformer.get(numKeys + 1)); + + // Test single-get access + for (int k = 1; k <= numKeys; ++k) { + Object valueObj = clientWithRecordTransformer.get(k).get(); + String expectedValue = "a" + k + "Transformed"; + assertEquals(valueObj.toString(), expectedValue); + assertEquals(recordTransformer.get(k), expectedValue); + } + + /* + * Simulates a client restart. During this process, the DVRT will use the on-disk state + * to repopulate the inMemoryDB, avoiding the need for re-ingestion after clearing. + */ + clientWithRecordTransformer.close(); + recordTransformer.clearInMemoryDB(); + assertTrue(recordTransformer.isInMemoryDBEmpty()); + + clientWithRecordTransformer.start(); + clientWithRecordTransformer.subscribeAll().get(); + + for (int k = 1; k <= numKeys; ++k) { + Object valueObj = clientWithRecordTransformer.get(k).get(); + String expectedValue = "a" + k + "Transformed"; + assertEquals(valueObj.toString(), expectedValue); + assertEquals(recordTransformer.get(k), expectedValue); + } + + clientWithRecordTransformer.unsubscribeAll(); + } + } + + @Test(timeOut = TEST_TIMEOUT, dataProvider = "dv-client-config-provider", dataProviderClass = DataProviderUtils.class) + public void testRecordTransformerChunking(DaVinciConfig clientConfig) throws Exception { + // Construct a large string to trigger chunking + // (2MB = 2 * 1024 * 1024 bytes) + int sizeInBytes = 2 * 1024 * 1024; + StringBuilder stringBuilder = new StringBuilder(sizeInBytes); + while (stringBuilder.length() < sizeInBytes) { + stringBuilder.append("a"); + } + String largeString = stringBuilder.toString(); + + String storeName = Utils.getUniqueString("test-store"); + boolean pushStatusStoreEnabled = true; + boolean chunkingEnabled = true; + CompressionStrategy compressionStrategy = CompressionStrategy.GZIP; + int numKeys = 10; + + setUpStore(storeName, pushStatusStoreEnabled, chunkingEnabled, compressionStrategy, largeString, numKeys); + + VeniceProperties backendConfig = buildRecordTransformerBackendConfig(pushStatusStoreEnabled); + MetricsRepository metricsRepository = new MetricsRepository(); + clientConfig.setStorageClass(StorageClass.DISK); + + try (CachingDaVinciClientFactory factory = new CachingDaVinciClientFactory( + d2Client, + VeniceRouterWrapper.CLUSTER_DISCOVERY_D2_SERVICE_NAME, + metricsRepository, + backendConfig)) { + + TestStringRecordTransformer recordTransformer = new TestStringRecordTransformer(1, true); + + DaVinciRecordTransformerConfig recordTransformerConfig = new DaVinciRecordTransformerConfig( + (storeVersion) -> recordTransformer, + String.class, + Schema.create(Schema.Type.STRING)); + clientConfig.setRecordTransformerConfig(recordTransformerConfig); + + DaVinciClient clientWithRecordTransformer = + factory.getAndStartGenericAvroClient(storeName, clientConfig); + + // Test non-existent key access + clientWithRecordTransformer.subscribeAll().get(); + assertNull(clientWithRecordTransformer.get(numKeys + 1).get()); + assertNull(recordTransformer.get(numKeys + 1)); + + // Test single-get access + for (int k = 1; k <= numKeys; ++k) { + Object valueObj = clientWithRecordTransformer.get(k).get(); + String expectedValue = largeString + k + "Transformed"; + assertEquals(valueObj.toString(), expectedValue); + assertEquals(recordTransformer.get(k), expectedValue); + } + + /* + * Simulates a client restart. During this process, the DVRT will use the on-disk state + * to repopulate the inMemoryDB, avoiding the need for re-ingestion after clearing. + */ + clientWithRecordTransformer.close(); + recordTransformer.clearInMemoryDB(); + assertTrue(recordTransformer.isInMemoryDBEmpty()); + + clientWithRecordTransformer.start(); + clientWithRecordTransformer.subscribeAll().get(); + + for (int k = 1; k <= numKeys; ++k) { + Object valueObj = clientWithRecordTransformer.get(k).get(); + String expectedValue = largeString + k + "Transformed"; + assertEquals(valueObj.toString(), expectedValue); + assertEquals(recordTransformer.get(k), expectedValue); + } + + clientWithRecordTransformer.unsubscribeAll(); + } + } + + @Test(timeOut = TEST_TIMEOUT, dataProvider = "dv-client-config-provider", dataProviderClass = DataProviderUtils.class) + public void testRecordTransformerWithEmptyDaVinci(DaVinciConfig clientConfig) throws Exception { + String storeName = Utils.getUniqueString("test-store"); + boolean pushStatusStoreEnabled = false; + boolean chunkingEnabled = false; + CompressionStrategy compressionStrategy = CompressionStrategy.NO_OP; + String customValue = "a"; + int numKeys = 10; + + setUpStore(storeName, pushStatusStoreEnabled, chunkingEnabled, compressionStrategy, customValue, numKeys); + + VeniceProperties backendConfig = buildRecordTransformerBackendConfig(pushStatusStoreEnabled); + MetricsRepository metricsRepository = new MetricsRepository(); + + TestStringRecordTransformer recordTransformer = new TestStringRecordTransformer(1, false); + + try (CachingDaVinciClientFactory factory = new CachingDaVinciClientFactory( + d2Client, + VeniceRouterWrapper.CLUSTER_DISCOVERY_D2_SERVICE_NAME, + metricsRepository, + backendConfig)) { + DaVinciRecordTransformerConfig recordTransformerConfig = new DaVinciRecordTransformerConfig( + (storeVersion) -> recordTransformer, + String.class, + Schema.create(Schema.Type.STRING)); + clientConfig.setRecordTransformerConfig(recordTransformerConfig); + + DaVinciClient clientWithRecordTransformer = + factory.getAndStartGenericAvroClient(storeName, clientConfig); + + clientWithRecordTransformer.subscribeAll().get(); + for (int k = 1; k <= numKeys; ++k) { + // Record shouldn't be stored in Da Vinci + assertNull(clientWithRecordTransformer.get(k).get()); + + // Record should be stored in inMemoryDB + String expectedValue = "a" + k + "Transformed"; + assertEquals(recordTransformer.get(k), expectedValue); } clientWithRecordTransformer.unsubscribeAll(); } + } + + @Test(timeOut = TEST_TIMEOUT, dataProvider = "dv-client-config-provider", dataProviderClass = DataProviderUtils.class) + public void testSkipResultRecordTransformer(DaVinciConfig clientConfig) throws Exception { + String storeName = Utils.getUniqueString("test-store"); + boolean pushStatusStoreEnabled = false; + boolean chunkingEnabled = false; + CompressionStrategy compressionStrategy = CompressionStrategy.NO_OP; + String customValue = "a"; + int numKeys = 10; + + setUpStore(storeName, pushStatusStoreEnabled, chunkingEnabled, compressionStrategy, customValue, numKeys); + + VeniceProperties backendConfig = buildRecordTransformerBackendConfig(pushStatusStoreEnabled); + MetricsRepository metricsRepository = new MetricsRepository(); - if (clientConfig.isRecordTransformerEnabled()) { - clientConfig.setRecordTransformerFunction(null); + TestSkipResultRecordTransformer recordTransformer = new TestSkipResultRecordTransformer(1, true); + + try (CachingDaVinciClientFactory factory = new CachingDaVinciClientFactory( + d2Client, + VeniceRouterWrapper.CLUSTER_DISCOVERY_D2_SERVICE_NAME, + metricsRepository, + backendConfig)) { + DaVinciRecordTransformerConfig recordTransformerConfig = new DaVinciRecordTransformerConfig( + (storeVersion) -> recordTransformer, + String.class, + Schema.create(Schema.Type.STRING)); + clientConfig.setRecordTransformerConfig(recordTransformerConfig); + + DaVinciClient clientWithRecordTransformer = + factory.getAndStartGenericAvroClient(storeName, clientConfig); + clientWithRecordTransformer.subscribeAll().get(); + + /* + * Since the record transformer is skipping over every record, + * nothing should exist in Da Vinci or in the inMemoryDB. + */ + assertTrue(recordTransformer.isInMemoryDBEmpty()); + for (int k = 1; k <= numKeys; ++k) { + assertNull(clientWithRecordTransformer.get(k).get()); + } + clientWithRecordTransformer.unsubscribeAll(); } + } + + @Test(timeOut = TEST_TIMEOUT, dataProvider = "dv-client-config-provider", dataProviderClass = DataProviderUtils.class) + public void testUnchangedResultRecordTransformer(DaVinciConfig clientConfig) throws Exception { + String storeName = Utils.getUniqueString("test-store"); + boolean pushStatusStoreEnabled = false; + boolean chunkingEnabled = false; + CompressionStrategy compressionStrategy = CompressionStrategy.NO_OP; + String customValue = "a"; + int numKeys = 10; + + setUpStore(storeName, pushStatusStoreEnabled, chunkingEnabled, compressionStrategy, customValue, numKeys); + + VeniceProperties backendConfig = buildRecordTransformerBackendConfig(pushStatusStoreEnabled); + MetricsRepository metricsRepository = new MetricsRepository(); + + try (CachingDaVinciClientFactory factory = new CachingDaVinciClientFactory( + d2Client, + VeniceRouterWrapper.CLUSTER_DISCOVERY_D2_SERVICE_NAME, + metricsRepository, + backendConfig)) { + DaVinciRecordTransformerConfig recordTransformerConfig = new DaVinciRecordTransformerConfig( + (storeVersion) -> new TestUnchangedResultRecordTransformer(storeVersion, true), + String.class, + Schema.create(Schema.Type.STRING)); + clientConfig.setRecordTransformerConfig(recordTransformerConfig); + + DaVinciClient clientWithRecordTransformer = + factory.getAndStartGenericAvroClient(storeName, clientConfig); + + // Test non-existent key access + clientWithRecordTransformer.subscribeAll().get(); + assertNull(clientWithRecordTransformer.get(numKeys + 1).get()); + + // Records shouldn't be transformed + for (int k = 1; k <= numKeys; ++k) { + Object valueObj = clientWithRecordTransformer.get(k).get(); + String expectedValue = "a" + k; + assertEquals(valueObj.toString(), expectedValue); + } + clientWithRecordTransformer.unsubscribeAll(); + } + } + + @Test(timeOut = TEST_TIMEOUT, dataProvider = "dv-client-config-provider", dataProviderClass = DataProviderUtils.class) + public void testBatchStore(DaVinciConfig clientConfig) throws Exception { + String storeName1 = createStoreWithMetaSystemStoreAndPushStatusSystemStore(KEY_COUNT); + String storeName2 = createStoreWithMetaSystemStoreAndPushStatusSystemStore(KEY_COUNT); + String storeName3 = createStoreWithMetaSystemStoreAndPushStatusSystemStore(KEY_COUNT); + String baseDataPath = Utils.getTempDataDirectory().getAbsolutePath(); + VeniceProperties backendConfig = new PropertyBuilder().put(CLIENT_USE_SYSTEM_STORE_REPOSITORY, true) + .put(CLIENT_SYSTEM_STORE_REPOSITORY_REFRESH_INTERVAL_SECONDS, 1) + .put(DATA_BASE_PATH, baseDataPath) + .put(PERSISTENCE_TYPE, ROCKS_DB) + .put(DA_VINCI_CURRENT_VERSION_BOOTSTRAPPING_SPEEDUP_ENABLED, true) + .put(PUSH_STATUS_STORE_ENABLED, true) + .put(DAVINCI_PUSH_STATUS_CHECK_INTERVAL_IN_MS, 1000) + .build(); + + MetricsRepository metricsRepository = new MetricsRepository(); // Test multiple clients sharing the same ClientConfig/MetricsRepository & base data path try (CachingDaVinciClientFactory factory = new CachingDaVinciClientFactory( @@ -1252,6 +1578,11 @@ private void generateHybridData(String storeName, List> dat } } + /* + * Batch data schema: + * Key: Integer + * Value: String + */ private void setUpStore( String storeName, Consumer paramsConsumer, @@ -1259,25 +1590,128 @@ private void setUpStore( setUpStore(storeName, paramsConsumer, propertiesConsumer, false); } + /* + * Batch data schema: + * Key: Integer + * Value: String + */ + private void setUpStore( + String storeName, + Consumer paramsConsumer, + Consumer propertiesConsumer, + boolean useDVCPushStatusStore) { + boolean chunkingEnabled = false; + CompressionStrategy compressionStrategy = CompressionStrategy.NO_OP; + + Runnable writeAvroFileRunnable = () -> { + try { + writeSimpleAvroFileWithIntToStringSchema(inputDir); + } catch (IOException e) { + throw new RuntimeException(e); + } + }; + String valueSchema = "\"string\""; + setUpStore( + storeName, + paramsConsumer, + propertiesConsumer, + useDVCPushStatusStore, + chunkingEnabled, + compressionStrategy, + writeAvroFileRunnable, + valueSchema); + } + + /* + * Batch data schema: + * Key: Integer + * Value: String + */ + private void setUpStore( + String storeName, + boolean useDVCPushStatusStore, + boolean chunkingEnabled, + CompressionStrategy compressionStrategy, + String customValue, + int numKeys) { + Consumer paramsConsumer = params -> {}; + Consumer propertiesConsumer = properties -> {}; + Runnable writeAvroFileRunnable = () -> { + try { + writeSimpleAvroFileWithIntToStringSchema(inputDir, customValue, numKeys); + } catch (IOException e) { + throw new RuntimeException(e); + } + }; + String valueSchema = "\"string\""; + setUpStore( + storeName, + paramsConsumer, + propertiesConsumer, + useDVCPushStatusStore, + chunkingEnabled, + compressionStrategy, + writeAvroFileRunnable, + valueSchema); + } + + /* + * Batch data schema: + * Key: Integer + * Value: Integer + */ + private void setUpStore( + String storeName, + boolean useDVCPushStatusStore, + boolean chunkingEnabled, + CompressionStrategy compressionStrategy, + int numKeys) { + Consumer paramsConsumer = params -> {}; + Consumer propertiesConsumer = properties -> {}; + Runnable writeAvroFileRunnable = () -> { + try { + writeSimpleAvroFileWithIntToIntSchema(inputDir, numKeys); + } catch (IOException e) { + throw new RuntimeException(e); + } + }; + String valueSchema = "\"int\""; + setUpStore( + storeName, + paramsConsumer, + propertiesConsumer, + useDVCPushStatusStore, + chunkingEnabled, + compressionStrategy, + writeAvroFileRunnable, + valueSchema); + } + private void setUpStore( String storeName, Consumer paramsConsumer, Consumer propertiesConsumer, - boolean useDVCPushStatusStore) throws Exception { + boolean useDVCPushStatusStore, + boolean chunkingEnabled, + CompressionStrategy compressionStrategy, + Runnable writeAvroFileRunnable, + String valueSchema) { // Produce input data. - writeSimpleAvroFileWithIntToStringSchema(inputDir); + writeAvroFileRunnable.run(); // Setup VPJ job properties. Properties vpjProperties = defaultVPJProps(cluster, inputDirPath, storeName); propertiesConsumer.accept(vpjProperties); // Create & update store for test. final int numPartitions = 3; - UpdateStoreQueryParams params = new UpdateStoreQueryParams().setPartitionCount(numPartitions); // Update the - // partition count. + UpdateStoreQueryParams params = new UpdateStoreQueryParams().setPartitionCount(numPartitions) + .setChunkingEnabled(chunkingEnabled) + .setCompressionStrategy(compressionStrategy); + paramsConsumer.accept(params); try (ControllerClient controllerClient = - createStoreForJob(cluster, DEFAULT_KEY_SCHEMA, "\"string\"", vpjProperties)) { + createStoreForJob(cluster, DEFAULT_KEY_SCHEMA, valueSchema, vpjProperties)) { cluster.createMetaSystemStore(storeName); if (useDVCPushStatusStore) { cluster.createPushStatusSystemStore(storeName); @@ -1303,18 +1737,25 @@ private String createStoreWithMetaSystemStoreAndPushStatusSystemStore(int keyCou return storeName; } - private String createStoreWithMetaSystemStoreAndPushStatusSystemStore( - int keyCount, - CompressionStrategy compressionStrategy, - Function compressionDictionaryGenerator) throws Exception { - String storeName = cluster.createStore(keyCount, compressionStrategy, compressionDictionaryGenerator); - cluster.createMetaSystemStore(storeName); - cluster.createPushStatusSystemStore(storeName); - return storeName; - } - @DataProvider(name = "CompressionStrategy") public static Object[][] compressionStrategy() { return DataProviderUtils.allPermutationGenerator(DataProviderUtils.COMPRESSION_STRATEGIES); } + + public VeniceProperties buildRecordTransformerBackendConfig(boolean pushStatusStoreEnabled) { + String baseDataPath = Utils.getTempDataDirectory().getAbsolutePath(); + PropertyBuilder backendPropertyBuilder = new PropertyBuilder().put(CLIENT_USE_SYSTEM_STORE_REPOSITORY, true) + .put(CLIENT_SYSTEM_STORE_REPOSITORY_REFRESH_INTERVAL_SECONDS, 1) + .put(DATA_BASE_PATH, baseDataPath) + .put(PERSISTENCE_TYPE, ROCKS_DB) + .put(DA_VINCI_CURRENT_VERSION_BOOTSTRAPPING_SPEEDUP_ENABLED, true) + .put(PUSH_STATUS_STORE_ENABLED, pushStatusStoreEnabled) + .put(DAVINCI_PUSH_STATUS_CHECK_INTERVAL_IN_MS, 1000); + + if (pushStatusStoreEnabled) { + backendPropertyBuilder.put(PUSH_STATUS_STORE_ENABLED, true).put(DAVINCI_PUSH_STATUS_CHECK_INTERVAL_IN_MS, 1000); + } + + return backendPropertyBuilder.build(); + } } diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestIntToStringRecordTransformer.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestIntToStringRecordTransformer.java new file mode 100644 index 0000000000..bfcb13a04d --- /dev/null +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestIntToStringRecordTransformer.java @@ -0,0 +1,38 @@ +package com.linkedin.venice.endToEnd; + +import com.linkedin.davinci.client.DaVinciRecordTransformer; +import com.linkedin.davinci.client.DaVinciRecordTransformerResult; +import com.linkedin.venice.utils.lazy.Lazy; +import org.apache.avro.Schema; + + +/** + * Transforms int values to strings + */ +public class TestIntToStringRecordTransformer extends DaVinciRecordTransformer { + public TestIntToStringRecordTransformer(int storeVersion, boolean storeRecordsInDaVinci) { + super(storeVersion, storeRecordsInDaVinci); + } + + @Override + public Schema getKeySchema() { + return Schema.create(Schema.Type.INT); + } + + @Override + public Schema getOutputValueSchema() { + return Schema.create(Schema.Type.STRING); + } + + @Override + public DaVinciRecordTransformerResult transform(Lazy key, Lazy value) { + String valueStr = value.get().toString(); + String transformedValue = valueStr + "Transformed"; + return new DaVinciRecordTransformerResult<>(DaVinciRecordTransformerResult.Result.TRANSFORMED, transformedValue); + } + + @Override + public void processPut(Lazy key, Lazy value) { + return; + } +} diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestRecordTransformer.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestRecordTransformer.java deleted file mode 100644 index 157c897c8f..0000000000 --- a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestRecordTransformer.java +++ /dev/null @@ -1,31 +0,0 @@ -package com.linkedin.venice.endToEnd; - -import com.linkedin.davinci.client.DaVinciRecordTransformer; -import com.linkedin.venice.utils.lazy.Lazy; -import org.apache.avro.Schema; - - -public class TestRecordTransformer extends DaVinciRecordTransformer { - public TestRecordTransformer(int storeVersion) { - super(storeVersion); - } - - Schema originalSchema; - - public Schema getKeyOutputSchema() { - return originalSchema; - } - - public Schema getValueOutputSchema() { - return originalSchema; - } - - public void setOriginalSchema(Schema schema) { - this.originalSchema = schema; - } - - public Integer put(Lazy key, Lazy value) { - return value.get() * 100; - } - -} diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestSkipResultRecordTransformer.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestSkipResultRecordTransformer.java new file mode 100644 index 0000000000..f787ce0d2b --- /dev/null +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestSkipResultRecordTransformer.java @@ -0,0 +1,63 @@ +package com.linkedin.venice.endToEnd; + +import com.linkedin.davinci.client.DaVinciRecordTransformer; +import com.linkedin.davinci.client.DaVinciRecordTransformerResult; +import com.linkedin.venice.utils.lazy.Lazy; +import java.util.HashMap; +import java.util.Map; +import org.apache.avro.Schema; +import org.apache.avro.util.Utf8; + + +public class TestSkipResultRecordTransformer extends DaVinciRecordTransformer { + private final Map inMemoryDB = new HashMap<>(); + + public TestSkipResultRecordTransformer(int storeVersion, boolean storeRecordsInDaVinci) { + super(storeVersion, storeRecordsInDaVinci); + } + + @Override + public Schema getKeySchema() { + return Schema.create(Schema.Type.INT); + } + + @Override + public Schema getOutputValueSchema() { + return Schema.create(Schema.Type.STRING); + } + + @Override + public DaVinciRecordTransformerResult transform(Lazy key, Lazy value) { + return new DaVinciRecordTransformerResult<>(DaVinciRecordTransformerResult.Result.SKIP); + } + + @Override + public void processPut(Lazy key, Lazy value) { + String valueStr = convertUtf8ToString(value.get()); + put(key.get(), valueStr); + } + + public String convertUtf8ToString(Object valueObj) { + String valueStr; + if (valueObj instanceof Utf8) { + valueStr = valueObj.toString(); + } else { + valueStr = (String) valueObj; + } + + return valueStr; + } + + public boolean isInMemoryDBEmpty() { + return inMemoryDB.isEmpty(); + } + + public String get(Integer key) { + return inMemoryDB.get(key); + } + + public void put(Integer key, String value) { + inMemoryDB.put(key, value); + } + +} diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestStringRecordTransformer.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestStringRecordTransformer.java new file mode 100644 index 0000000000..ba13b4aa49 --- /dev/null +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestStringRecordTransformer.java @@ -0,0 +1,69 @@ +package com.linkedin.venice.endToEnd; + +import com.linkedin.davinci.client.DaVinciRecordTransformer; +import com.linkedin.davinci.client.DaVinciRecordTransformerResult; +import com.linkedin.venice.utils.lazy.Lazy; +import java.util.HashMap; +import java.util.Map; +import org.apache.avro.Schema; +import org.apache.avro.util.Utf8; + + +public class TestStringRecordTransformer extends DaVinciRecordTransformer { + private final Map inMemoryDB = new HashMap<>(); + + public TestStringRecordTransformer(int storeVersion, boolean storeRecordsInDaVinci) { + super(storeVersion, storeRecordsInDaVinci); + } + + @Override + public Schema getKeySchema() { + return Schema.create(Schema.Type.INT); + } + + @Override + public Schema getOutputValueSchema() { + return Schema.create(Schema.Type.STRING); + } + + @Override + public DaVinciRecordTransformerResult transform(Lazy key, Lazy value) { + String valueStr = convertUtf8ToString(value.get()); + String transformedValue = valueStr + "Transformed"; + return new DaVinciRecordTransformerResult<>(DaVinciRecordTransformerResult.Result.TRANSFORMED, transformedValue); + } + + @Override + public void processPut(Lazy key, Lazy value) { + String valueStr = convertUtf8ToString(value.get()); + put(key.get(), valueStr); + } + + public String convertUtf8ToString(Object valueObj) { + String valueStr; + if (valueObj instanceof Utf8) { + valueStr = valueObj.toString(); + } else { + valueStr = (String) valueObj; + } + + return valueStr; + } + + public void clearInMemoryDB() { + inMemoryDB.clear(); + } + + public boolean isInMemoryDBEmpty() { + return inMemoryDB.isEmpty(); + } + + public String get(Integer key) { + return inMemoryDB.get(key); + } + + public void put(Integer key, String value) { + inMemoryDB.put(key, value); + } + +} diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestUnchangedResultRecordTransformer.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestUnchangedResultRecordTransformer.java new file mode 100644 index 0000000000..0f46fb1229 --- /dev/null +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestUnchangedResultRecordTransformer.java @@ -0,0 +1,34 @@ +package com.linkedin.venice.endToEnd; + +import com.linkedin.davinci.client.DaVinciRecordTransformer; +import com.linkedin.davinci.client.DaVinciRecordTransformerResult; +import com.linkedin.venice.utils.lazy.Lazy; +import org.apache.avro.Schema; + + +public class TestUnchangedResultRecordTransformer extends DaVinciRecordTransformer { + public TestUnchangedResultRecordTransformer(int storeVersion, boolean storeRecordsInDaVinci) { + super(storeVersion, storeRecordsInDaVinci); + } + + @Override + public Schema getKeySchema() { + return Schema.create(Schema.Type.INT); + } + + @Override + public Schema getOutputValueSchema() { + return Schema.create(Schema.Type.STRING); + } + + @Override + public DaVinciRecordTransformerResult transform(Lazy key, Lazy value) { + return new DaVinciRecordTransformerResult<>(DaVinciRecordTransformerResult.Result.UNCHANGED); + } + + @Override + public void processPut(Lazy key, Lazy value) { + return; + } + +} diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/integration/utils/VeniceClusterWrapper.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/integration/utils/VeniceClusterWrapper.java index 4b2f11aaa6..f5e62a7e35 100644 --- a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/integration/utils/VeniceClusterWrapper.java +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/integration/utils/VeniceClusterWrapper.java @@ -1006,7 +1006,12 @@ public String createStore( Stream batchData, CompressionStrategy compressionStrategy, Function compressionDictionaryGenerator) { - return createStore(DEFAULT_KEY_SCHEMA, DEFAULT_VALUE_SCHEMA, batchData, CompressionStrategy.NO_OP, null); + return createStore( + DEFAULT_KEY_SCHEMA, + DEFAULT_VALUE_SCHEMA, + batchData, + compressionStrategy, + compressionDictionaryGenerator); } public String createStore(int keyCount, GenericRecord record) { diff --git a/internal/venice-test-common/src/main/java/com/linkedin/venice/utils/TestWriteUtils.java b/internal/venice-test-common/src/main/java/com/linkedin/venice/utils/TestWriteUtils.java index 62280a1dae..f73967276b 100644 --- a/internal/venice-test-common/src/main/java/com/linkedin/venice/utils/TestWriteUtils.java +++ b/internal/venice-test-common/src/main/java/com/linkedin/venice/utils/TestWriteUtils.java @@ -123,6 +123,8 @@ public class TestWriteUtils { // Push Input Folder Schema public static final Schema INT_TO_STRING_SCHEMA = new PushInputSchemaBuilder().setKeySchema(INT_SCHEMA).setValueSchema(STRING_SCHEMA).build(); + public static final Schema INT_TO_INT_SCHEMA = + new PushInputSchemaBuilder().setKeySchema(INT_SCHEMA).setValueSchema(INT_SCHEMA).build(); public static final Schema STRING_TO_STRING_SCHEMA = new PushInputSchemaBuilder().setKeySchema(STRING_SCHEMA).setValueSchema(STRING_SCHEMA).build(); public static final Schema STRING_TO_NAME_RECORD_V1_SCHEMA = @@ -296,11 +298,27 @@ public static Schema writeSimpleAvroFileWithCustomSize( } public static Schema writeSimpleAvroFileWithIntToStringSchema(File parentDir) throws IOException { + return writeSimpleAvroFileWithIntToStringSchema(parentDir, "name ", DEFAULT_USER_DATA_RECORD_COUNT); + } + + public static Schema writeSimpleAvroFileWithIntToStringSchema(File parentDir, String customValue, int numKeys) + throws IOException { return writeAvroFile(parentDir, "int2string.avro", INT_TO_STRING_SCHEMA, (recordSchema, writer) -> { - for (int i = 1; i <= DEFAULT_USER_DATA_RECORD_COUNT; ++i) { + for (int i = 1; i <= numKeys; ++i) { + GenericRecord i2s = new GenericData.Record(recordSchema); + i2s.put(DEFAULT_KEY_FIELD_PROP, i); + i2s.put(DEFAULT_VALUE_FIELD_PROP, customValue + i); + writer.append(i2s); + } + }); + } + + public static Schema writeSimpleAvroFileWithIntToIntSchema(File parentDir, int numKeys) throws IOException { + return writeAvroFile(parentDir, "int2int.avro", INT_TO_INT_SCHEMA, (recordSchema, writer) -> { + for (int i = 1; i <= numKeys; ++i) { GenericRecord i2s = new GenericData.Record(recordSchema); i2s.put(DEFAULT_KEY_FIELD_PROP, i); - i2s.put(DEFAULT_VALUE_FIELD_PROP, "name " + i); + i2s.put(DEFAULT_VALUE_FIELD_PROP, i); writer.append(i2s); } });