Skip to content
This repository has been archived by the owner on Sep 11, 2024. It is now read-only.

feat: Add support for setting object metadata Content-Encoding #359

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -589,6 +589,10 @@ gcs.credentials.json={"type":"...", ...}
gcs.credentials.default=true
##

# The value of object metadata Content-Encoding.
# This can be used for leveraging storage-side de-compression before download.
# Optional, the default is null.
gcs.object.content.encoding=gzip

# The set of the fields that are to be output, comma separated.
# Supported values are: `key`, `value`, `offset`, `timestamp`, and `headers`.
Expand Down
13 changes: 12 additions & 1 deletion src/main/java/io/aiven/kafka/connect/gcs/GcsSinkConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -57,11 +57,13 @@ public final class GcsSinkConfig extends AivenCommonConfig {
public static final String GCS_CREDENTIALS_JSON_CONFIG = "gcs.credentials.json";
public static final String GCS_CREDENTIALS_DEFAULT_CONFIG = "gcs.credentials.default";
public static final String GCS_BUCKET_NAME_CONFIG = "gcs.bucket.name";
public static final String GCS_OBJECT_CONTENT_ENCODING_CONFIG = "gcs.object.content.encoding";
public static final String GCS_USER_AGENT = "gcs.user.agent";
private static final String GROUP_FILE = "File";
public static final String FILE_NAME_PREFIX_CONFIG = "file.name.prefix";
public static final String FILE_NAME_TEMPLATE_CONFIG = "file.name.template";
public static final String FILE_COMPRESSION_TYPE_CONFIG = "file.compression.type";

public static final String FILE_MAX_RECORDS = "file.max.records";
public static final String FILE_NAME_TIMESTAMP_TIMEZONE = "file.name.timestamp.timezone";
public static final String FILE_NAME_TIMESTAMP_SOURCE = "file.name.timestamp.source";
Expand Down Expand Up @@ -135,6 +137,11 @@ private static void addGcsConfigGroup(final ConfigDef configDef) {
+ GCS_CREDENTIALS_PATH_CONFIG + "\"",
GROUP_GCS, gcsGroupCounter++, ConfigDef.Width.NONE, GCS_CREDENTIALS_DEFAULT_CONFIG);

configDef.define(GCS_OBJECT_CONTENT_ENCODING_CONFIG, ConfigDef.Type.STRING, null,
new ConfigDef.NonEmptyString(), ConfigDef.Importance.LOW,
"The GCS object metadata value of Content-Encoding.", GROUP_GCS, gcsGroupCounter++,
ConfigDef.Width.NONE, GCS_OBJECT_CONTENT_ENCODING_CONFIG);

configDef.define(GCS_BUCKET_NAME_CONFIG, ConfigDef.Type.STRING, ConfigDef.NO_DEFAULT_VALUE,
new ConfigDef.NonEmptyString(), ConfigDef.Importance.HIGH,
"The GCS bucket name to store output files in.", GROUP_GCS, gcsGroupCounter++, ConfigDef.Width.NONE, // NOPMD
Expand Down Expand Up @@ -332,7 +339,7 @@ private void validate() {
.filter(Objects::nonNull)
.count();

// only validate non nulls here, since all nulls means falling back to the default "no credential" behavour.
// only validate non nulls here, since all nulls means falling back to the default "no credential" behaviour.
if (nonNulls > MAX_ALLOWED_CREDENTIAL_CONFIGS) {
throw new ConfigException(String.format("Only one of %s, %s, and %s can be non-null.",
GCS_CREDENTIALS_DEFAULT_CONFIG, GCS_CREDENTIALS_JSON_CONFIG, GCS_CREDENTIALS_PATH_CONFIG));
Expand Down Expand Up @@ -371,6 +378,10 @@ public String getBucketName() {
return getString(GCS_BUCKET_NAME_CONFIG);
}

public String getObjectContentEncoding() {
return getString(GCS_OBJECT_CONTENT_ENCODING_CONFIG);
}

@Override
public CompressionType getCompressionType() {
return CompressionType.forName(getString(FILE_COMPRESSION_TYPE_CONFIG));
Expand Down
4 changes: 3 additions & 1 deletion src/main/java/io/aiven/kafka/connect/gcs/GcsSinkTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,9 @@ public void flush(final Map<TopicPartition, OffsetAndMetadata> currentOffsets) {
}

private void flushFile(final String filename, final List<SinkRecord> records) {
final BlobInfo blob = BlobInfo.newBuilder(config.getBucketName(), config.getPrefix() + filename).build();
final BlobInfo blob = BlobInfo.newBuilder(config.getBucketName(), config.getPrefix() + filename)
.setContentEncoding(config.getObjectContentEncoding())
.build();
try (var out = Channels.newOutputStream(storage.writer(blob));
var writer = OutputWriter.builder()
.withExternalProperties(config.originalsStrings())
Expand Down
45 changes: 45 additions & 0 deletions src/test/java/io/aiven/kafka/connect/gcs/GcsSinkTaskTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.CsvSource;
import org.junit.jupiter.params.provider.ValueSource;
import org.mockito.ArgumentCaptor;
import org.threeten.bp.Duration;
Expand Down Expand Up @@ -252,6 +253,45 @@ void compression(final String compression) {
readSplittedAndDecodedLinesFromBlob("topic1-1-40" + compressionType.extension(), compression, 0));
}

@ParameterizedTest
@CsvSource({ "none,none", "gzip,none", "none,gzip", "gzip,gzip" })
void contentEncodingAwareDownload(final String compression, final String encoding) {
properties.put(GcsSinkConfig.FILE_COMPRESSION_TYPE_CONFIG, compression);
properties.put(GcsSinkConfig.GCS_OBJECT_CONTENT_ENCODING_CONFIG, encoding);
final GcsSinkTask task = new GcsSinkTask(properties, storage);

task.put(basicRecords);
task.flush(null);

final CompressionType compressionType = CompressionType.forName(compression);

final List<String> names = Lists.newArrayList("topic0-0-10", "topic0-1-20", "topic0-2-50", "topic1-0-30",
"topic1-1-40");
final List<String> blobNames = names.stream()
.map(n -> n + compressionType.extension())
.collect(Collectors.toList());

assertIterableEquals(blobNames, testBucketAccessor.getBlobNames());
// given a blob with metadata Content-Encoding equal to its byte compression,
// the result of its GS-downloaded bytes is automatically un-compressed (gzip support only)
// see https://cloud.google.com/storage/docs/metadata#content-encoding
assertIterableEquals(
Lists.newArrayList(Collections.singletonList("value0"), Collections.singletonList("value5")),
readDecodedFieldsFromDownload("topic0-0-10" + compressionType.extension(), compression, 0));
assertIterableEquals(
Lists.newArrayList(Collections.singletonList("value1"), Collections.singletonList("value6")),
readDecodedFieldsFromDownload("topic0-1-20" + compressionType.extension(), compression, 0));
assertIterableEquals(
Lists.newArrayList(Collections.singletonList("value4"), Collections.singletonList("value9")),
readDecodedFieldsFromDownload("topic0-2-50" + compressionType.extension(), compression, 0));
assertIterableEquals(
Lists.newArrayList(Collections.singletonList("value2"), Collections.singletonList("value7")),
readDecodedFieldsFromDownload("topic1-0-30" + compressionType.extension(), compression, 0));
assertIterableEquals(
Lists.newArrayList(Collections.singletonList("value3"), Collections.singletonList("value8")),
readDecodedFieldsFromDownload("topic1-1-40" + compressionType.extension(), compression, 0));
}

@ParameterizedTest
@ValueSource(strings = { "none", "gzip", "snappy", "zstd" })
void allFields(final String compression) {
Expand Down Expand Up @@ -745,6 +785,11 @@ private Collection<List<String>> readSplittedAndDecodedLinesFromBlob(final Strin
return testBucketAccessor.readAndDecodeLines(blobName, compression, fieldsToDecode);
}

private Collection<List<String>> readDecodedFieldsFromDownload(final String blobName, final String compression,
final int... fieldsToDecode) {
return testBucketAccessor.downloadBlobAndDecodeFields(blobName, compression, fieldsToDecode);
}

private Map<String, Collection<List<String>>> buildBlobNameValuesMap(final String compression) {
final CompressionType compressionType = CompressionType.forName(compression);
final String extension = compressionType.extension();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,14 @@

import java.io.BufferedReader;
import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Arrays;
import java.util.Base64;
import java.util.HashMap;
Expand All @@ -41,6 +45,7 @@

import com.github.luben.zstd.ZstdInputStream;
import com.google.cloud.storage.Blob;
import com.google.cloud.storage.BlobId;
import com.google.cloud.storage.BlobInfo;
import com.google.cloud.storage.Storage;
import org.xerial.snappy.SnappyInputStream;
Expand All @@ -53,6 +58,7 @@ public final class BucketAccessor {
private List<String> blobNamesCache;
private final Map<String, String> stringContentCache = new HashMap<>();
private final Map<String, List<String>> linesCache = new HashMap<>();
private final Map<String, List<String>> downloadedLinesCache = new HashMap<>();
private final Map<String, List<List<String>>> decodedLinesCache = new HashMap<>();

public BucketAccessor(final Storage storage, final String bucketName, final boolean cache) {
Expand Down Expand Up @@ -121,6 +127,7 @@ public void clear(final String prefix) {
stringContentCache.clear();
linesCache.clear();
decodedLinesCache.clear();
downloadedLinesCache.clear();
}
}

Expand Down Expand Up @@ -165,13 +172,49 @@ private List<String> readLines0(final String blobName, final String compression)
InputStream decompressedStream = getDecompressedStream(bais, compression);
InputStreamReader reader = new InputStreamReader(decompressedStream, StandardCharsets.UTF_8);
BufferedReader bufferedReader = new BufferedReader(reader)) {

return bufferedReader.lines().collect(Collectors.toList());
} catch (final IOException e) {
throw new RuntimeException(e); // NOPMD
}
}

public List<String> downloadBlobAndReadLines(final String blobName, final String compression) {
Objects.requireNonNull(blobName, "blobName cannot be null");
Objects.requireNonNull(compression, "compression cannot be null");
if (cache) {
return downloadedLinesCache.computeIfAbsent(blobName,
k -> downloadBlobAndReadLines0(blobName, compression));
} else {
return downloadBlobAndReadLines0(blobName, compression);
}
}

private List<String> downloadBlobAndReadLines0(final String blobName, final String compression) {
final String filePath = downloadBlobToTempFile(blobName);
try {
final byte[] bytes = Files.readAllBytes(Path.of(filePath));
try (ByteArrayInputStream bais = new ByteArrayInputStream(bytes);
InputStream decompressedStream = getDecompressedStream(bais, compression);
InputStreamReader reader = new InputStreamReader(decompressedStream, StandardCharsets.UTF_8);
BufferedReader bufferedReader = new BufferedReader(reader)) {
return bufferedReader.lines().collect(Collectors.toList());
}
} catch (IOException exception) {
throw new RuntimeException(exception); // NOPMD
}
}

private String downloadBlobToTempFile(final String blobName) {
try {
final File file = File.createTempFile("tmp", null);
final String filePath = file.getAbsolutePath();
storage.downloadTo(BlobId.fromGsUtilUri("gs://" + bucketName + "/" + blobName), Paths.get(filePath));
return filePath;
} catch (final IOException e) {
throw new RuntimeException(e); // NOPMD
}
}

private InputStream getDecompressedStream(final InputStream inputStream, final String compression)
throws IOException {
Objects.requireNonNull(inputStream, "inputStream cannot be null");
Expand Down Expand Up @@ -211,6 +254,27 @@ private List<List<String>> readAndDecodeLines0(final String blobName, final Stri
.collect(Collectors.toList());
}

public List<List<String>> downloadBlobAndDecodeFields(final String blobName, final String compression,
final int... fieldsToDecode) {
Objects.requireNonNull(blobName, "blobName cannot be null");
Objects.requireNonNull(fieldsToDecode, "fieldsToDecode cannot be null");

if (cache) {
return decodedLinesCache.computeIfAbsent(blobName,
k -> downloadBlobAndDecodeFields0(blobName, compression, fieldsToDecode));
} else {
return downloadBlobAndDecodeFields0(blobName, compression, fieldsToDecode);
}
}

private List<List<String>> downloadBlobAndDecodeFields0(final String blobName, final String compression,
final int... fieldsToDecode) {
return downloadBlobAndReadLines(blobName, compression).stream()
.map(l -> l.split(","))
.map(fields -> decodeRequiredFields(fields, fieldsToDecode))
.collect(Collectors.toList());
}

private List<String> decodeRequiredFields(final String[] originalFields, final int[] fieldsToDecode) {
Objects.requireNonNull(originalFields, "originalFields cannot be null");
Objects.requireNonNull(fieldsToDecode, "fieldsToDecode cannot be null");
Expand Down
Loading