Skip to content
This repository has been archived by the owner on Sep 11, 2024. It is now read-only.

Commit

Permalink
feat: Add support for setting object metadata Content-Encoding
Browse files Browse the repository at this point in the history
Users willing to leverage GCS capability to decompress gzip objects
on server-side when accessing them through the Storage API requested
the fixed-metadata `Content-Encoding` (default: null) to become
configurable so that its value can be set (ie. to `gzip`) when the
connector uploads a new file to the bucket.
https://cloud.google.com/storage/docs/metadata#content-encoding
  • Loading branch information
jclarysse committed May 30, 2024
1 parent 227fb1d commit b3750e2
Show file tree
Hide file tree
Showing 4 changed files with 44 additions and 2 deletions.
4 changes: 4 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -589,6 +589,10 @@ gcs.credentials.json={"type":"...", ...}
gcs.credentials.default=true
##

# The value of object metadata Content-Encoding.
# This can be used for leveraging storage-side de-compression before download.
# Optional, the default is null.
gcs.object.content.encoding=gzip

# The set of the fields that are to be output, comma separated.
# Supported values are: `key`, `value`, `offset`, `timestamp`, and `headers`.
Expand Down
13 changes: 12 additions & 1 deletion src/main/java/io/aiven/kafka/connect/gcs/GcsSinkConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -57,11 +57,13 @@ public final class GcsSinkConfig extends AivenCommonConfig {
public static final String GCS_CREDENTIALS_JSON_CONFIG = "gcs.credentials.json";
public static final String GCS_CREDENTIALS_DEFAULT_CONFIG = "gcs.credentials.default";
public static final String GCS_BUCKET_NAME_CONFIG = "gcs.bucket.name";
public static final String GCS_OBJECT_CONTENT_ENCODING_CONFIG = "gcs.object.content.encoding";
public static final String GCS_USER_AGENT = "gcs.user.agent";
private static final String GROUP_FILE = "File";
public static final String FILE_NAME_PREFIX_CONFIG = "file.name.prefix";
public static final String FILE_NAME_TEMPLATE_CONFIG = "file.name.template";
public static final String FILE_COMPRESSION_TYPE_CONFIG = "file.compression.type";

public static final String FILE_MAX_RECORDS = "file.max.records";
public static final String FILE_NAME_TIMESTAMP_TIMEZONE = "file.name.timestamp.timezone";
public static final String FILE_NAME_TIMESTAMP_SOURCE = "file.name.timestamp.source";
Expand Down Expand Up @@ -135,6 +137,11 @@ private static void addGcsConfigGroup(final ConfigDef configDef) {
+ GCS_CREDENTIALS_PATH_CONFIG + "\"",
GROUP_GCS, gcsGroupCounter++, ConfigDef.Width.NONE, GCS_CREDENTIALS_DEFAULT_CONFIG);

configDef.define(GCS_OBJECT_CONTENT_ENCODING_CONFIG, ConfigDef.Type.STRING, null,
new ConfigDef.NonEmptyString(), ConfigDef.Importance.LOW,
"The GCS object metadata value of Content-Encoding.", GROUP_GCS, gcsGroupCounter++,
ConfigDef.Width.NONE, GCS_OBJECT_CONTENT_ENCODING_CONFIG);

configDef.define(GCS_BUCKET_NAME_CONFIG, ConfigDef.Type.STRING, ConfigDef.NO_DEFAULT_VALUE,
new ConfigDef.NonEmptyString(), ConfigDef.Importance.HIGH,
"The GCS bucket name to store output files in.", GROUP_GCS, gcsGroupCounter++, ConfigDef.Width.NONE, // NOPMD
Expand Down Expand Up @@ -332,7 +339,7 @@ private void validate() {
.filter(Objects::nonNull)
.count();

// only validate non nulls here, since all nulls means falling back to the default "no credential" behavour.
// only validate non nulls here, since all nulls means falling back to the default "no credential" behaviour.
if (nonNulls > MAX_ALLOWED_CREDENTIAL_CONFIGS) {
throw new ConfigException(String.format("Only one of %s, %s, and %s can be non-null.",
GCS_CREDENTIALS_DEFAULT_CONFIG, GCS_CREDENTIALS_JSON_CONFIG, GCS_CREDENTIALS_PATH_CONFIG));
Expand Down Expand Up @@ -371,6 +378,10 @@ public String getBucketName() {
return getString(GCS_BUCKET_NAME_CONFIG);
}

public String getObjectContentEncoding() {
return getString(GCS_OBJECT_CONTENT_ENCODING_CONFIG);
}

@Override
public CompressionType getCompressionType() {
return CompressionType.forName(getString(FILE_COMPRESSION_TYPE_CONFIG));
Expand Down
4 changes: 3 additions & 1 deletion src/main/java/io/aiven/kafka/connect/gcs/GcsSinkTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,9 @@ public void flush(final Map<TopicPartition, OffsetAndMetadata> currentOffsets) {
}

private void flushFile(final String filename, final List<SinkRecord> records) {
final BlobInfo blob = BlobInfo.newBuilder(config.getBucketName(), config.getPrefix() + filename).build();
final BlobInfo blob = BlobInfo.newBuilder(config.getBucketName(), config.getPrefix() + filename)
.setContentEncoding(config.getObjectContentEncoding())
.build();
try (var out = Channels.newOutputStream(storage.writer(blob));
var writer = OutputWriter.builder()
.withExternalProperties(config.originalsStrings())
Expand Down
25 changes: 25 additions & 0 deletions src/test/java/io/aiven/kafka/connect/gcs/GcsSinkTaskTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,31 @@ void compression(final String compression) {
readSplittedAndDecodedLinesFromBlob("topic1-1-40" + compressionType.extension(), compression, 0));
}

@ParameterizedTest
@ValueSource(strings = { "gzip" })
void contentEncoding(final String compression) {
properties.put(GcsSinkConfig.GCS_OBJECT_CONTENT_ENCODING_CONFIG, compression);
properties.put(GcsSinkConfig.FILE_COMPRESSION_TYPE_CONFIG, compression);
final GcsSinkTask task = new GcsSinkTask(properties, storage);

task.put(basicRecords);
task.flush(null);

final CompressionType compressionType = CompressionType.forName(compression);

final List<String> names = Lists.newArrayList("topic0-0-10");
final List<String> blobNames = names.stream()
.map(n -> n + compressionType.extension())
.collect(Collectors.toList());

assertIterableEquals(blobNames, testBucketAccessor.getBlobNames());
// reading a gzip-compressed blob with metadata Content-Encoding=gzip should be the same as reading a
// non-compressed blob
assertIterableEquals(
Lists.newArrayList(Collections.singletonList("value0"), Collections.singletonList("value5")),
readSplittedAndDecodedLinesFromBlob("topic0-0-10" + compressionType.extension(), "none", 0));
}

@ParameterizedTest
@ValueSource(strings = { "none", "gzip", "snappy", "zstd" })
void allFields(final String compression) {
Expand Down

0 comments on commit b3750e2

Please sign in to comment.