Skip to content
This repository has been archived by the owner on Sep 11, 2024. It is now read-only.

Commit

Permalink
feat: Add support for setting object metadata Content-Encoding
Browse files Browse the repository at this point in the history
Users willing to leverage GCS capability to decompress gzip objects on server-side when accessing them through the Storage API requested the fixed-metadata `Content-Encoding` (default: null) to become configurable so that its value can be set (ie. to `gzip`) when the connector uploads a new file to the bucket.
https://cloud.google.com/storage/docs/metadata#content-encoding
  • Loading branch information
jclarysse committed Jun 3, 2024
1 parent e8917e4 commit db14bad
Show file tree
Hide file tree
Showing 3 changed files with 93 additions and 8 deletions.
2 changes: 1 addition & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
version=0.14.0-SNAPSHOT
version=0.13.0-SNAPSHOT

sonatypeUsername=<fill>
sonatypePassword=<fill>
Expand Down
35 changes: 28 additions & 7 deletions src/test/java/io/aiven/kafka/connect/gcs/GcsSinkTaskTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.CsvSource;
import org.junit.jupiter.params.provider.ValueSource;
import org.mockito.ArgumentCaptor;
import org.threeten.bp.Duration;
Expand Down Expand Up @@ -253,28 +254,44 @@ void compression(final String compression) {
}

@ParameterizedTest
@ValueSource(strings = { "gzip" })
void contentEncoding(final String compression) {
properties.put(GcsSinkConfig.GCS_OBJECT_CONTENT_ENCODING_CONFIG, compression);
// {..., "gzip,gzip"} doesn't yet seem to be supported by GS API in TestContainers
// decoding fails with java.lang.IllegalArgumentException: Illegal base64 character 1f
@CsvSource({ "none,none" })
void contentEncodingAwareDownload(final String compression, final String encoding) {
properties.put(GcsSinkConfig.FILE_COMPRESSION_TYPE_CONFIG, compression);
properties.put(GcsSinkConfig.GCS_OBJECT_CONTENT_ENCODING_CONFIG, encoding);
final GcsSinkTask task = new GcsSinkTask(properties, storage);

task.put(basicRecords);
task.flush(null);

final CompressionType compressionType = CompressionType.forName(compression);

final List<String> names = Lists.newArrayList("topic0-0-10");
final List<String> names = Lists.newArrayList("topic0-0-10", "topic0-1-20", "topic0-2-50", "topic1-0-30",
"topic1-1-40");
final List<String> blobNames = names.stream()
.map(n -> n + compressionType.extension())
.collect(Collectors.toList());

assertIterableEquals(blobNames, testBucketAccessor.getBlobNames());
// reading a gzip-compressed blob with metadata Content-Encoding=gzip should be the same as reading a
// non-compressed blob
// given a blob with metadata Content-Encoding equal to its byte compression,
// the result of its GS-downloaded bytes is automatically un-compressed (gzip support only)
// see https://cloud.google.com/storage/docs/metadata#content-encoding
assertIterableEquals(
Lists.newArrayList(Collections.singletonList("value0"), Collections.singletonList("value5")),
readSplittedAndDecodedLinesFromBlob("topic0-0-10" + compressionType.extension(), "none", 0));
readDecodedFieldsFromDownload("topic0-0-10" + compressionType.extension(), 0));
assertIterableEquals(
Lists.newArrayList(Collections.singletonList("value1"), Collections.singletonList("value6")),
readDecodedFieldsFromDownload("topic0-1-20" + compressionType.extension(), 0));
assertIterableEquals(
Lists.newArrayList(Collections.singletonList("value4"), Collections.singletonList("value9")),
readDecodedFieldsFromDownload("topic0-2-50" + compressionType.extension(), 0));
assertIterableEquals(
Lists.newArrayList(Collections.singletonList("value2"), Collections.singletonList("value7")),
readDecodedFieldsFromDownload("topic1-0-30" + compressionType.extension(), 0));
assertIterableEquals(
Lists.newArrayList(Collections.singletonList("value3"), Collections.singletonList("value8")),
readDecodedFieldsFromDownload("topic1-1-40" + compressionType.extension(), 0));
}

@ParameterizedTest
Expand Down Expand Up @@ -770,6 +787,10 @@ private Collection<List<String>> readSplittedAndDecodedLinesFromBlob(final Strin
return testBucketAccessor.readAndDecodeLines(blobName, compression, fieldsToDecode);
}

private Collection<List<String>> readDecodedFieldsFromDownload(final String blobName, final int... fieldsToDecode) {
return testBucketAccessor.downloadBlobAndDecodeFields(blobName, fieldsToDecode);
}

private Map<String, Collection<List<String>>> buildBlobNameValuesMap(final String compression) {
final CompressionType compressionType = CompressionType.forName(compression);
final String extension = compressionType.extension();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,13 @@

import java.io.BufferedReader;
import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.nio.charset.StandardCharsets;
import java.nio.file.Paths;
import java.util.Arrays;
import java.util.Base64;
import java.util.HashMap;
Expand All @@ -41,6 +44,7 @@

import com.github.luben.zstd.ZstdInputStream;
import com.google.cloud.storage.Blob;
import com.google.cloud.storage.BlobId;
import com.google.cloud.storage.BlobInfo;
import com.google.cloud.storage.Storage;
import org.xerial.snappy.SnappyInputStream;
Expand All @@ -53,6 +57,7 @@ public final class BucketAccessor {
private List<String> blobNamesCache;
private final Map<String, String> stringContentCache = new HashMap<>();
private final Map<String, List<String>> linesCache = new HashMap<>();
private final Map<String, List<String>> downloadedLinesCache = new HashMap<>();
private final Map<String, List<List<String>>> decodedLinesCache = new HashMap<>();

public BucketAccessor(final Storage storage, final String bucketName, final boolean cache) {
Expand Down Expand Up @@ -121,6 +126,7 @@ public void clear(final String prefix) {
stringContentCache.clear();
linesCache.clear();
decodedLinesCache.clear();
downloadedLinesCache.clear();
}
}

Expand Down Expand Up @@ -165,7 +171,46 @@ private List<String> readLines0(final String blobName, final String compression)
InputStream decompressedStream = getDecompressedStream(bais, compression);
InputStreamReader reader = new InputStreamReader(decompressedStream, StandardCharsets.UTF_8);
BufferedReader bufferedReader = new BufferedReader(reader)) {
return bufferedReader.lines().collect(Collectors.toList());
} catch (final IOException e) {
throw new RuntimeException(e); // NOPMD
}
}

public List<String> downloadBlobAndReadLines(final String blobName) {
Objects.requireNonNull(blobName, "blobName cannot be null");
if (cache) {
return downloadedLinesCache.computeIfAbsent(blobName, k -> downloadBlobAndReadLines0(blobName));
} else {
return downloadBlobAndReadLines0(blobName);
}
}

private List<String> downloadBlobAndReadLines0(final String blobName) {
final String filePath = downloadBlobToTempFile(blobName);
final List<String> bytes = readDownloadedBytes(filePath);
final File file = new File(filePath);
if (file != null && file.isFile()) {
assert file.delete();
}
return bytes;
}

private String downloadBlobToTempFile(final String blobName) {
try {
final File file = File.createTempFile("tmp", null);
final String filePath = file.getAbsolutePath();
storage.downloadTo(BlobId.fromGsUtilUri("gs://" + bucketName + "/" + blobName), Paths.get(filePath));
return filePath;
} catch (final IOException e) {
throw new RuntimeException(e); // NOPMD
}
}

private List<String> readDownloadedBytes(final String filePath) {
try (FileInputStream fis = new FileInputStream(filePath); // NOPMD
InputStreamReader isr = new InputStreamReader(fis, "UTF-8");
BufferedReader bufferedReader = new BufferedReader(isr);) {
return bufferedReader.lines().collect(Collectors.toList());
} catch (final IOException e) {
throw new RuntimeException(e); // NOPMD
Expand Down Expand Up @@ -211,6 +256,25 @@ private List<List<String>> readAndDecodeLines0(final String blobName, final Stri
.collect(Collectors.toList());
}

public List<List<String>> downloadBlobAndDecodeFields(final String blobName, final int... fieldsToDecode) {
Objects.requireNonNull(blobName, "blobName cannot be null");
Objects.requireNonNull(fieldsToDecode, "fieldsToDecode cannot be null");

if (cache) {
return decodedLinesCache.computeIfAbsent(blobName,
k -> downloadBlobAndDecodeFields0(blobName, fieldsToDecode));
} else {
return downloadBlobAndDecodeFields0(blobName, fieldsToDecode);
}
}

private List<List<String>> downloadBlobAndDecodeFields0(final String blobName, final int... fieldsToDecode) {
return downloadBlobAndReadLines(blobName).stream()
.map(l -> l.split(","))
.map(fields -> decodeRequiredFields(fields, fieldsToDecode))
.collect(Collectors.toList());
}

private List<String> decodeRequiredFields(final String[] originalFields, final int[] fieldsToDecode) {
Objects.requireNonNull(originalFields, "originalFields cannot be null");
Objects.requireNonNull(fieldsToDecode, "fieldsToDecode cannot be null");
Expand Down

0 comments on commit db14bad

Please sign in to comment.