Skip to content

Commit

Permalink
[vpj] [common] [da-vinci] Refactor schema source venice-common to con…
Browse files Browse the repository at this point in the history
…solidate duplicated code in ETL (#1141)

* Refactor schema source venice-common to consolidate duplicated code in ETL
* Invoke persistence for key schema only if it is valid directory
  • Loading branch information
mynameborat authored Sep 9, 2024
1 parent 9494b43 commit 35c1ec1
Show file tree
Hide file tree
Showing 5 changed files with 121 additions and 11 deletions.
6 changes: 6 additions & 0 deletions internal/venice-common/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,12 @@ dependencies {
exclude group: 'com.linkedin.container', module: 'container-eventbus-factory' // Keeping' it clean.
}

implementation (libraries.hadoopCommon) {
// Exclude transitive dependency
exclude group: 'org.apache.avro'
exclude group: 'javax.servlet'
}

implementation project(':clients:venice-thin-client')

implementation libraries.avroUtilCompatHelper
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
package com.linkedin.venice.hadoop.schema;

import com.google.common.base.Preconditions;
import com.linkedin.avroutil1.compatibility.AvroCompatibilityHelper;
import com.linkedin.davinci.schema.SchemaUtils;
import com.linkedin.venice.annotation.NotThreadsafe;
import com.linkedin.venice.controllerapi.ControllerClient;
import com.linkedin.venice.controllerapi.MultiSchemaResponse;
import com.linkedin.venice.controllerapi.SchemaResponse;
import com.linkedin.venice.schema.AvroSchemaParseUtils;
import com.linkedin.venice.schema.rmd.RmdVersionId;
import com.linkedin.venice.utils.Utils;
Expand All @@ -13,8 +15,10 @@
import java.io.InputStreamReader;
import java.io.OutputStreamWriter;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import org.apache.avro.Schema;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
Expand Down Expand Up @@ -42,24 +46,46 @@ public class HDFSSchemaSource implements SchemaSource, AutoCloseable {
private final Path rmdSchemaDir;
private final Path valueSchemaDir;

public HDFSSchemaSource(final Path valueSchemaDir, final Path rmdSchemaDir, final String storeName)
throws IOException {
// For non ETL flows (historical), the key schema is part of the push configuration and doesn't require caching
private final boolean includeKeySchema;

private final Path keySchemaDir;

public HDFSSchemaSource(
final Path valueSchemaDir,
final Path rmdSchemaDir,
final Path keySchemaDir,
final String storeName) throws IOException {
Configuration conf = new Configuration();
this.rmdSchemaDir = rmdSchemaDir;
// TODO: Consider either using global filesystem or remove per instance fs and infer filesystem from the path during
// usage
this.fs = this.rmdSchemaDir.getFileSystem(conf);
this.valueSchemaDir = valueSchemaDir;
this.keySchemaDir = keySchemaDir;
this.includeKeySchema = keySchemaDir != null;
this.storeName = storeName;

initialize();
}

private void initialize() throws IOException {
if (!fs.exists(this.rmdSchemaDir)) {
fs.mkdirs(this.rmdSchemaDir);
}
this.valueSchemaDir = valueSchemaDir;

if (!fs.exists(this.valueSchemaDir)) {
fs.mkdirs(this.valueSchemaDir);
}

this.storeName = storeName;
if (includeKeySchema && !fs.exists(this.keySchemaDir)) {
fs.mkdirs(this.keySchemaDir);
}
}

public HDFSSchemaSource(final Path valueSchemaDir, final Path rmdSchemaDir) throws IOException {
this(valueSchemaDir, rmdSchemaDir, null);
public HDFSSchemaSource(final Path valueSchemaDir, final Path rmdSchemaDir, final String storeName)
throws IOException {
this(valueSchemaDir, rmdSchemaDir, null, storeName);
}

public HDFSSchemaSource(final String valueSchemaDir, final String rmdSchemaDir, final String storeName)
Expand All @@ -71,6 +97,10 @@ public HDFSSchemaSource(final String valueSchemaDir, final String rmdSchemaDir)
this(new Path(valueSchemaDir), new Path(rmdSchemaDir), null);
}

public String getKeySchemaPath() {
return keySchemaDir.toString();
}

public String getRmdSchemaPath() {
return rmdSchemaDir.toString();
}
Expand All @@ -87,6 +117,10 @@ public String getValueSchemaPath() {
public void saveSchemasOnDisk(ControllerClient controllerClient) throws IOException, IllegalStateException {
saveSchemaResponseToDisk(controllerClient.getAllReplicationMetadataSchemas(storeName).getSchemas(), true);
saveSchemaResponseToDisk(controllerClient.getAllValueSchema(storeName).getSchemas(), false);

if (includeKeySchema) {
saveKeySchemaToDisk(controllerClient.getKeySchema(storeName));
}
}

void saveSchemaResponseToDisk(MultiSchemaResponse.Schema[] schemas, boolean isRmdSchema)
Expand Down Expand Up @@ -121,6 +155,23 @@ void saveSchemaResponseToDisk(MultiSchemaResponse.Schema[] schemas, boolean isRm
}
}

void saveKeySchemaToDisk(SchemaResponse schema) throws IOException, IllegalStateException {
Preconditions.checkState(includeKeySchema, "Cannot be invoked with invalid key schema directory");
Preconditions.checkState(!schema.isError(), "Cannot persist schema. Encountered error in schema response");
LOGGER.info("Caching key schema for store: {} in {}", storeName, keySchemaDir.getName());

Path schemaPath = new Path(keySchemaDir, String.valueOf(schema.getId()));
if (!fs.exists(schemaPath)) {
try (FSDataOutputStream outputStream = fs.create(schemaPath);
OutputStreamWriter outputStreamWriter = new OutputStreamWriter(outputStream, StandardCharsets.UTF_8)) {
outputStreamWriter.write(schema.getSchemaStr() + "\n");
outputStreamWriter.flush();
}
} else {
throw new IllegalStateException(String.format("The schema path %s already exists", schemaPath));
}
}

/**
* Fetch all rmd schemas under the {@link #rmdSchemaDir} if available.
*
Expand Down Expand Up @@ -173,6 +224,32 @@ public Map<Integer, Schema> fetchValueSchemas() throws IOException {
return mapping;
}

@Override
public Schema fetchKeySchema() throws IOException {
Preconditions.checkState(includeKeySchema, "Cannot be invoked with invalid key schema directory");
FileStatus[] fileStatus = fs.listStatus(keySchemaDir);
LOGGER.info("Fetching key schemas from :{}", keySchemaDir);

Optional<Schema> keySchema = Arrays.stream(fileStatus).map(status -> {
Path path = status.getPath();
Schema outputSchema = null;
try (FSDataInputStream in = fs.open(path);
BufferedReader reader = new BufferedReader((new InputStreamReader(in, StandardCharsets.UTF_8)))) {
String schemaStr = reader.readLine();
if (schemaStr != null) {
outputSchema = Schema.parse(schemaStr.trim());
}
} catch (Exception e) {
LOGGER.error("Failed to fetch key schema due to ", e);
}

return outputSchema;
}).findFirst();

return keySchema
.orElseThrow(() -> new RuntimeException(String.format("Failed to load key schema from %s", keySchemaDir)));
}

@Override
public void close() {
Utils.closeQuietlyWithErrorLogged(fs);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,6 @@ public interface SchemaSource {
Map<RmdVersionId, Schema> fetchRmdSchemas() throws IOException;

Map<Integer, Schema> fetchValueSchemas() throws IOException;

Schema fetchKeySchema() throws IOException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,19 @@
import static com.linkedin.venice.utils.TestWriteUtils.getTempDataDirectory;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

import com.linkedin.davinci.schema.SchemaUtils;
import com.linkedin.venice.controllerapi.ControllerClient;
import com.linkedin.venice.controllerapi.MultiSchemaResponse;
import com.linkedin.venice.controllerapi.SchemaResponse;
import com.linkedin.venice.schema.AvroSchemaParseUtils;
import com.linkedin.venice.schema.rmd.RmdSchemaGenerator;
import com.linkedin.venice.schema.rmd.RmdVersionId;
import java.io.File;
import java.io.IOException;
import java.util.Map;
import org.apache.avro.Schema;
import org.apache.hadoop.fs.Path;
import org.testng.Assert;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
Expand All @@ -23,6 +25,9 @@ public class TestHDFSSchemaSource {
private final static String TEST_STORE = "test_store";
private final static int numOfSchemas = 3;

private static final String KEY_SCHEMA_STR = "\"string\"";
private static final Schema KEY_SCHEMA = Schema.parse(KEY_SCHEMA_STR);

private final static String VALUE_SCHEMA_STRING =
"{\"type\":\"record\"," + "\"name\":\"User\"," + "\"namespace\":\"example.avro\"," + "\"fields\":["
+ "{\"name\":\"name\",\"type\":\"string\",\"default\":\"venice\"}]}";
Expand All @@ -37,8 +42,9 @@ public class TestHDFSSchemaSource {

@BeforeClass
public void setUp() throws IOException {
File rmdInputDir = getTempDataDirectory();
File valueInputDir = getTempDataDirectory();
Path rmdInputDir = new Path(getTempDataDirectory().getAbsolutePath());
Path valueInputDir = new Path(getTempDataDirectory().getAbsolutePath());
Path keyInputDir = new Path(getTempDataDirectory().getAbsolutePath());
client = mock(ControllerClient.class);
MultiSchemaResponse.Schema[] rmdSchemas = generateRmdSchemas(numOfSchemas);
MultiSchemaResponse rmdSchemaResponse = new MultiSchemaResponse();
Expand All @@ -47,12 +53,17 @@ public void setUp() throws IOException {
MultiSchemaResponse.Schema[] valueSchemas = generateValueSchema(numOfSchemas);
MultiSchemaResponse valueSchemaResponse = new MultiSchemaResponse();
valueSchemaResponse.setSchemas(valueSchemas);

SchemaResponse keySchemaResponse = new SchemaResponse();
keySchemaResponse.setSchemaStr(KEY_SCHEMA_STR);
keySchemaResponse.setId(1);

doReturn(rmdSchemaResponse).when(client).getAllReplicationMetadataSchemas(TEST_STORE);
doReturn(valueSchemaResponse).when(client).getAllValueSchema(TEST_STORE);
doReturn(keySchemaResponse).when(client).getKeySchema(TEST_STORE);

source = new HDFSSchemaSource(valueInputDir.getAbsolutePath(), rmdInputDir.getAbsolutePath(), TEST_STORE);
source = new HDFSSchemaSource(valueInputDir, rmdInputDir, keyInputDir, TEST_STORE);
source.saveSchemasOnDisk(client);

}

@Test
Expand All @@ -75,6 +86,20 @@ public void testLoadValueSchemaThenFetch() throws IOException {
}
}

@Test
public void testLoadKeySchemaThenFetch() throws IOException {
Schema actualKeySchema = source.fetchKeySchema();
Assert.assertEquals(actualKeySchema.toString(), KEY_SCHEMA.toString());
}

@Test(expectedExceptions = IllegalStateException.class)
public void testSaveKeySchemaThrowsExceptionWithInvalidResponse() throws IOException {
SchemaResponse mockResponse = mock(SchemaResponse.class);
when(mockResponse.isError()).thenReturn(true);
source.saveKeySchemaToDisk(mockResponse);

}

private MultiSchemaResponse.Schema[] generateRmdSchemas(int n) {
MultiSchemaResponse.Schema[] response = new MultiSchemaResponse.Schema[n];
for (int i = 1; i <= n; i++) {
Expand Down

0 comments on commit 35c1ec1

Please sign in to comment.