Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[dvc][doc] Create MVP for DaVinciRecordTransformer #1087

Merged
merged 71 commits into from
Oct 29, 2024
Merged
Show file tree
Hide file tree
Changes from 19 commits
Commits
Show all changes
71 commits
Select commit Hold shift + click to select a range
8dc58f0
Create MVP for record transformer
kvargha Jul 29, 2024
88b62fe
Fix docs and imports
kvargha Jul 29, 2024
fc856b7
Don't make serializers and deserializers global
kvargha Jul 29, 2024
03f0453
Dont check for boolean val
kvargha Jul 29, 2024
45ba862
Delete classHash if it exists
kvargha Jul 29, 2024
e8fa547
Assert file deletion is true
kvargha Jul 29, 2024
b3f3e0a
Improve code coverage
kvargha Jul 30, 2024
f3cacff
Add tests for blocking record transformer
kvargha Jul 30, 2024
d2532b3
Created AvroGenericDaVinciClient for record transformer test
kvargha Jul 30, 2024
4d3ee2b
Make sure getRecordTransformer is valid
kvargha Jul 30, 2024
189127c
Make sure getRecordTransformer isn't null
kvargha Jul 30, 2024
9995758
Merge branch 'main' into dvrt-mvp
kvargha Jul 30, 2024
2c421a6
Reorganize testRecordTransformerClient and fix key schema
kvargha Jul 30, 2024
68d2be8
Fix TestStringRecordTransformer to work with Avro objects and update doc
kvargha Jul 31, 2024
9ffbf41
Merge pull request #1 from kvargha/dvrt-mvp
kvargha Jul 31, 2024
fc0b188
Refactor onRecovery and add javadoc for DaVinciRecordTransformer's co…
kvargha Jul 31, 2024
05c22b4
Reset offset if we need to bootstrap from VT
kvargha Aug 1, 2024
1ef7c20
Merge branch 'main' into dvrt-mvp
kvargha Aug 2, 2024
6b56ff8
Delete classHash after running tests
kvargha Aug 2, 2024
2cbd14f
Throw an error if a user tries to use blob transfer with record trans…
kvargha Aug 2, 2024
1ed816b
Make previous public methods private, remove subscribe call inside on…
kvargha Aug 13, 2024
6162604
Correctly pass DVRT functional interface to initBackend, add todo to …
kvargha Aug 13, 2024
968d357
Fix spotbugs
kvargha Aug 13, 2024
e78340e
Init DVRT inside SIT, and move DVRT recovery to SIT
kvargha Aug 14, 2024
42485d9
Modify checkout action
kvargha Aug 15, 2024
22b57e9
Undo
kvargha Aug 15, 2024
5f4da22
Merge branch 'linkedin:main' into dvrt-mvp
kvargha Aug 15, 2024
e57caeb
Merge branch 'main' into dvrt-mvp
kvargha Aug 15, 2024
3c7ec6c
Fix compilation
kvargha Aug 15, 2024
36926d5
Cache deserializer/serializer
kvargha Aug 16, 2024
ef6ac83
Create utility class for record transformer
kvargha Aug 19, 2024
14fef31
Create AbstractStorageIterator and a RocksDB implementation
kvargha Aug 23, 2024
80dd551
Delete classHash file. Compare value classes and don't override value…
kvargha Sep 4, 2024
2dd00a8
Merge branch 'linkedin:main' into dvrt-mvp
kvargha Sep 4, 2024
a4227a5
Remove compareCacheConfig mock
kvargha Sep 4, 2024
91ba185
Wrap access modifier with doPrivileged
kvargha Sep 4, 2024
cf7169e
Merge branch 'main' into dvrt-mvp
kvargha Sep 12, 2024
4d958a2
Fix spotless error
kvargha Sep 12, 2024
4017da6
Merge branch 'main' into dvrt-mvp
kvargha Sep 17, 2024
d15f437
Remove unused variables
kvargha Sep 17, 2024
7e49576
Added a ToDo to make chunking with record transformer lazy, and make …
kvargha Sep 18, 2024
f175b10
Created a config for record transformer that's passed into the DaVinc…
kvargha Sep 19, 2024
849ebce
Add message envelope for DVRT
kvargha Oct 19, 2024
3f44da3
Merge branch 'main' into dvrt-mvp
kvargha Oct 19, 2024
ccf1940
Fix spotless issue
kvargha Oct 19, 2024
54ba9ee
Fix test
kvargha Oct 19, 2024
12b012c
update docs
kvargha Oct 19, 2024
e747f70
Dvrt mvp (#4)
kvargha Oct 19, 2024
33690b7
Add another test, and update docs
kvargha Oct 21, 2024
1bb889b
Cleanup code
kvargha Oct 21, 2024
9462bc7
Cleanup docs
kvargha Oct 21, 2024
ac165e6
Merge branch 'main' into dvrt-mvp
kvargha Oct 21, 2024
edbd3ca
Cleanup
kvargha Oct 21, 2024
0b74a7a
Cleanup
kvargha Oct 22, 2024
39201bc
Add integration test for onrecovery. And fix iterator deserializer issue
kvargha Oct 23, 2024
4a8e44a
Throw error if II and DVRT are enabled together
kvargha Oct 23, 2024
fdd0ef0
Cleanup
kvargha Oct 23, 2024
b09a3c9
Compress transformed value
kvargha Oct 24, 2024
feb73cf
Add test for chunking
kvargha Oct 25, 2024
71ffec7
Cleanup
kvargha Oct 25, 2024
6b8bab1
Cleanup
kvargha Oct 25, 2024
a94b196
cleanup
kvargha Oct 25, 2024
be07be4
cleanup
kvargha Oct 25, 2024
609c06d
Add versionIngestion to onStart/onEnd function names
kvargha Oct 25, 2024
4215967
Cleanup
kvargha Oct 25, 2024
beb821f
Cleanup
kvargha Oct 25, 2024
bb25b73
Address review comments
kvargha Oct 28, 2024
83a88bd
Reorganize
kvargha Oct 28, 2024
bf85c1e
Add overrides
kvargha Oct 28, 2024
3170405
Fix integration test
kvargha Oct 28, 2024
6aa3e74
Add type change test for DVRT
kvargha Oct 29, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,7 @@ subprojects {
implementation libraries.grpcProtobuf
implementation libraries.grpcServices
implementation libraries.grpcStub
compileOnly 'org.ow2.asm:asm:9.7'
compileOnly libraries.tomcatAnnotations
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@
import static com.linkedin.venice.pushmonitor.ExecutionStatus.DVC_INGESTION_ERROR_OTHER;
import static java.lang.Thread.currentThread;

import com.linkedin.davinci.client.BlockingDaVinciRecordTransformer;
import com.linkedin.davinci.client.DaVinciRecordTransformer;
import com.linkedin.davinci.client.DaVinciRecordTransformerFunctionalInterface;
import com.linkedin.davinci.compression.StorageEngineBackedCompressorFactory;
import com.linkedin.davinci.config.StoreBackendConfig;
import com.linkedin.davinci.config.VeniceConfigLoader;
Expand Down Expand Up @@ -106,6 +108,7 @@ public class DaVinciBackend implements Closeable {
private IngestionBackend ingestionBackend;
private final AggVersionedStorageEngineStats aggVersionedStorageEngineStats;
private final boolean useDaVinciSpecificExecutionStatusForError;
private DaVinciRecordTransformer recordTransformer;
private final ClientConfig clientConfig;
private BlobTransferManager<Void> blobTransferManager;

Expand All @@ -115,7 +118,7 @@ public DaVinciBackend(
Optional<Set<String>> managedClients,
ICProvider icProvider,
Optional<ObjectCacheConfig> cacheConfig,
Function<Integer, DaVinciRecordTransformer> getRecordTransformer) {
DaVinciRecordTransformerFunctionalInterface getRecordTransformer) {
LOGGER.info("Creating Da Vinci backend with managed clients: {}", managedClients);
try {
VeniceServerConfig backendConfig = configLoader.getVeniceServerConfig();
Expand Down Expand Up @@ -247,6 +250,19 @@ public DaVinciBackend(
cacheBackend = cacheConfig
.map(objectCacheConfig -> new ObjectCacheBackend(clientConfig, objectCacheConfig, schemaRepository));

String storeName = clientConfig.getStoreName();

if (getRecordTransformer != null && getRecordTransformer.apply(0) != null) {
kvargha marked this conversation as resolved.
Show resolved Hide resolved
storeRepository.refreshOneStore(storeName);
Store store = storeRepository.getStoreOrThrow(storeName);
int version = store.getCurrentVersion();

DaVinciRecordTransformer clientRecordTransformer = getRecordTransformer.apply(version);
recordTransformer = new BlockingDaVinciRecordTransformer(
kvargha marked this conversation as resolved.
Show resolved Hide resolved
clientRecordTransformer,
clientRecordTransformer.getStoreRecordsInDaVinci());
}

ingestionService = new KafkaStoreIngestionService(
storageService.getStorageEngineRepository(),
configLoader,
Expand All @@ -264,7 +280,7 @@ public DaVinciBackend(
false,
compressorFactory,
cacheBackend,
getRecordTransformer,
recordTransformer,
true,
// TODO: consider how/if a repair task would be valid for Davinci users?
null,
Expand All @@ -274,6 +290,7 @@ public DaVinciBackend(
null);

ingestionService.start();

ingestionService.addIngestionNotifier(ingestionListener);

if (isIsolatedIngestion() && cacheConfig.isPresent()) {
Expand Down Expand Up @@ -457,9 +474,16 @@ private synchronized void bootstrap() {
List<Integer> partitions = storeNameToPartitionListMap.get(storeName);
String versionTopic = version.kafkaTopicName();
LOGGER.info("Bootstrapping partitions {} for {}", partitions, versionTopic);
aggVersionedStorageEngineStats.setStorageEngine(versionTopic, storageService.getStorageEngine(versionTopic));
AbstractStorageEngine storageEngine = storageService.getStorageEngine(versionTopic);
aggVersionedStorageEngineStats.setStorageEngine(versionTopic, storageEngine);
StoreBackend storeBackend = getStoreOrThrow(storeName);
storeBackend.subscribe(ComplementSet.newSet(partitions), Optional.of(version));

if (recordTransformer != null) {
recordTransformer
.onRecovery(storageEngine, storeBackend, blobTransferManager, partitions, Optional.of(version));
} else {
storeBackend.subscribe(ComplementSet.newSet(partitions), Optional.of(version));
kvargha marked this conversation as resolved.
Show resolved Hide resolved
}
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ public CompletableFuture<Void> subscribe(ComplementSet<Integer> partitions) {
return subscribe(partitions, Optional.empty());
}

synchronized CompletableFuture<Void> subscribe(
public synchronized CompletableFuture<Void> subscribe(
kvargha marked this conversation as resolved.
Show resolved Hide resolved
ComplementSet<Integer> partitions,
Optional<Version> bootstrapVersion) {
if (daVinciCurrentVersion == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,6 @@
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.function.Function;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
Expand Down Expand Up @@ -136,6 +135,8 @@ protected boolean removeEldestEntry(Map.Entry<Integer, GenericRecord> eldest) {

private final AbstractAvroChunkingAdapter<V> chunkingAdapter;

private final DaVinciRecordTransformer recordTransformer;

public AvroGenericDaVinciClient(
DaVinciConfig daVinciConfig,
ClientConfig clientConfig,
Expand Down Expand Up @@ -175,6 +176,13 @@ protected AvroGenericDaVinciClient(
this.managedClients = managedClients;
this.icProvider = icProvider;
this.chunkingAdapter = chunkingAdapter;

this.recordTransformer = daVinciConfig.isRecordTransformerEnabled() ? daVinciConfig.getRecordTransformer(0) : null;
kvargha marked this conversation as resolved.
Show resolved Hide resolved

if (this.recordTransformer != null) {
this.clientConfig.setSpecificValueClass(recordTransformer.getOutputValueClass());
kvargha marked this conversation as resolved.
Show resolved Hide resolved
}

preValidation.run();
}

Expand Down Expand Up @@ -646,7 +654,8 @@ protected GenericRecordChunkingAdapter getGenericRecordChunkingAdapter() {
return GenericRecordChunkingAdapter.INSTANCE;
}

private D2ServiceDiscoveryResponse discoverService() {
// Visible for testing
public D2ServiceDiscoveryResponse discoverService() {
kvargha marked this conversation as resolved.
Show resolved Hide resolved
try (TransportClient client = getTransportClient(clientConfig)) {
if (!(client instanceof D2TransportClient)) {
throw new VeniceClientException(
Expand Down Expand Up @@ -677,6 +686,7 @@ private VeniceConfigLoader buildVeniceConfig() {
if (kafkaBootstrapServers == null) {
kafkaBootstrapServers = backendConfig.getString(KAFKA_BOOTSTRAP_SERVERS);
}

VeniceProperties config = new PropertyBuilder().put(KAFKA_ADMIN_CLASS, ApacheKafkaAdminAdapter.class.getName())
.put(ROCKSDB_LEVEL0_FILE_NUM_COMPACTION_TRIGGER, 4) // RocksDB default config
.put(ROCKSDB_LEVEL0_SLOWDOWN_WRITES_TRIGGER, 20) // RocksDB default config
Expand All @@ -691,10 +701,7 @@ private VeniceConfigLoader buildVeniceConfig() {
.put(INGESTION_USE_DA_VINCI_CLIENT, true)
.put(
RECORD_TRANSFORMER_VALUE_SCHEMA,
daVinciConfig.isRecordTransformerEnabled()
// We're creating a new record transformer here just to get the schema
? daVinciConfig.getRecordTransformer(0).getValueOutputSchema().toString()
: "null")
recordTransformer != null ? recordTransformer.getValueOutputSchema().toString() : "null")
.put(INGESTION_ISOLATION_CONFIG_PREFIX + "." + INGESTION_MEMORY_LIMIT, -1) // Explicitly disable memory limiter
// in Isolated Process
.put(backendConfig.toProperties())
Expand All @@ -703,13 +710,14 @@ private VeniceConfigLoader buildVeniceConfig() {
return new VeniceConfigLoader(config, config);
}

private void initBackend(
// Visible for testing
public void initBackend(
ClientConfig clientConfig,
VeniceConfigLoader configLoader,
Optional<Set<String>> managedClients,
ICProvider icProvider,
Optional<ObjectCacheConfig> cacheConfig,
Function<Integer, DaVinciRecordTransformer> getRecordTransformer) {
DaVinciRecordTransformerFunctionalInterface getRecordTransformer) {
synchronized (AvroGenericDaVinciClient.class) {
if (daVinciBackend == null) {
logger
Expand Down Expand Up @@ -784,12 +792,21 @@ public synchronized void start() {
this.keyDeserializer = FastSerializerDeserializerFactory.getFastAvroGenericDeserializer(keySchema, keySchema);
this.genericRecordStoreDeserializerCache =
new AvroStoreDeserializerCache(daVinciBackend.get().getSchemaRepository(), getStoreName(), true);
this.storeDeserializerCache = clientConfig.isSpecificClient()
? new AvroSpecificStoreDeserializerCache<>(

if (clientConfig.isSpecificClient()) {
if (recordTransformer != null) {
this.storeDeserializerCache = new AvroSpecificStoreDeserializerCache<>(
recordTransformer.getValueOutputSchema(),
clientConfig.getSpecificValueClass());
} else {
this.storeDeserializerCache = new AvroSpecificStoreDeserializerCache<>(
daVinciBackend.get().getSchemaRepository(),
getStoreName(),
clientConfig.getSpecificValueClass())
: (AvroStoreDeserializerCache<V>) this.genericRecordStoreDeserializerCache;
clientConfig.getSpecificValueClass());
}
} else {
this.storeDeserializerCache = (AvroStoreDeserializerCache<V>) this.genericRecordStoreDeserializerCache;
}

ready.set(true);
logger.info("Client is started successfully, storeName=" + getStoreName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ public class BlockingDaVinciRecordTransformer<K, V, O> extends DaVinciRecordTran
private final DaVinciRecordTransformer recordTransformer;
private final CountDownLatch startLatch = new CountDownLatch(1);

public BlockingDaVinciRecordTransformer(DaVinciRecordTransformer recordTransformer) {
super(recordTransformer.getStoreVersion());
public BlockingDaVinciRecordTransformer(DaVinciRecordTransformer recordTransformer, boolean storeRecordsInDaVinci) {
super(recordTransformer.getStoreVersion(), storeRecordsInDaVinci);
this.recordTransformer = recordTransformer;
}

Expand All @@ -32,28 +32,31 @@ public Schema getValueOutputSchema() {
return this.recordTransformer.getValueOutputSchema();
}

public O put(Lazy<K> key, Lazy<V> value) {
public O transform(Lazy<K> key, Lazy<V> value) {
return (O) this.recordTransformer.transform(key, value);
}

public void processPut(Lazy<K> key, Lazy<O> value) {
try {
// Waiting for onStartIngestionTask to complete before proceeding
startLatch.await();
return (O) this.recordTransformer.put(key, value);
this.recordTransformer.processPut(key, value);
} catch (InterruptedException e) {
// Restore the interrupt status
Thread.currentThread().interrupt();
return null;
}
}

public O delete(Lazy<K> key) {
return (O) this.recordTransformer.delete(key);
public O processDelete(Lazy<K> key) {
return (O) this.recordTransformer.processDelete(key);
}

public void onStartIngestionTask() {
this.recordTransformer.onStartIngestionTask();
public void onStart() {
this.recordTransformer.onStart();
startLatch.countDown();
}

public void onEndIngestionTask() {
this.recordTransformer.onEndIngestionTask();
public void onEnd() {
this.recordTransformer.onEnd();
}
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package com.linkedin.davinci.client;

import com.linkedin.davinci.store.cache.backend.ObjectCacheConfig;
import java.util.function.Function;


public class DaVinciConfig {
Expand Down Expand Up @@ -32,7 +31,7 @@ public class DaVinciConfig {
/**
* Record transformer reference
*/
private Function<Integer, DaVinciRecordTransformer> recordTransformerFunction;
private DaVinciRecordTransformerFunctionalInterface recordTransformerFunction;

/**
* Whether to enable read-path metrics.
Expand Down Expand Up @@ -127,7 +126,7 @@ public DaVinciRecordTransformer getRecordTransformer(Integer storeVersion) {
}

public DaVinciConfig setRecordTransformerFunction(
Function<Integer, DaVinciRecordTransformer> recordTransformerFunction) {
DaVinciRecordTransformerFunctionalInterface recordTransformerFunction) {
this.recordTransformerFunction = recordTransformerFunction;
return this;
}
Expand Down
Loading
Loading