Skip to content
This repository has been archived by the owner on Sep 11, 2024. It is now read-only.

Commit

Permalink
Added the referred to Aiven
Browse files Browse the repository at this point in the history
  • Loading branch information
eliax1996 committed Aug 7, 2023
1 parent c360123 commit 0784a43
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 3 deletions.
7 changes: 4 additions & 3 deletions CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,10 @@ To send us a pull request, please:
2. Modify the source; please focus on the specific change you are contributing. If you also reformat all the code, it will be hard for us to focus on your change.
3. Ensure local tests pass.
4. Commit to your fork using clear commit messages.
5. Send us a pull request, answering any default questions in the pull request interface.
6. Pay attention to any automated CI failures reported in the pull request, and stay involved in the conversation.
7. Before merging, clean up the commit history for the PR. Each commit should be self-contained with an informative message, since each commit will be added to the history for this project.
5. Be compliant with our formatting style by running `./gradlew spotlessApply`
6. Send us a pull request, answering any default questions in the pull request interface.
7. Pay attention to any automated CI failures reported in the pull request, and stay involved in the conversation.
8. Before merging, clean up the commit history for the PR. Each commit should be self-contained with an informative message, since each commit will be added to the history for this project.

GitHub provides additional document on [forking a repository](https://help.github.com/articles/fork-a-repo/) and
[creating a pull request](https://help.github.com/articles/creating-a-pull-request/).
Expand Down
7 changes: 7 additions & 0 deletions src/main/java/io/aiven/kafka/connect/gcs/GcsSinkTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,20 @@
import io.aiven.kafka.connect.common.output.OutputWriter;

import com.google.api.gax.retrying.RetrySettings;
import com.google.api.gax.rpc.FixedHeaderProvider;
import com.google.cloud.storage.BlobInfo;
import com.google.cloud.storage.Storage;
import com.google.cloud.storage.StorageOptions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public final class GcsSinkTask extends SinkTask {
public static final String CONNECTOR_USER_AGENT = "kafka_connect_gcs";
private static final Logger LOG = LoggerFactory.getLogger(GcsSinkConnector.class);
private static final String USER_AGENT_HEADER_KEY = "user-agent";
private static final String USER_AGENT_HEADER_FORMAT = "%s (GPN: Aiven;) Google BigQuery Sink/%s";
public static final String USER_AGENT_HEADER_VALUE = String.format(USER_AGENT_HEADER_FORMAT, CONNECTOR_USER_AGENT,
Version.VERSION);

private RecordGrouper recordGrouper;

Expand Down Expand Up @@ -72,6 +78,7 @@ public void start(final Map<String, String> props) {
this.storage = StorageOptions.newBuilder()
.setHost(config.getGcsEndpoint())
.setCredentials(config.getCredentials())
.setHeaderProvider(FixedHeaderProvider.create(USER_AGENT_HEADER_KEY, USER_AGENT_HEADER_VALUE))
.setRetrySettings(RetrySettings.newBuilder()
.setInitialRetryDelay(config.getGcsRetryBackoffInitialDelay())
.setMaxRetryDelay(config.getGcsRetryBackoffMaxDelay())
Expand Down
15 changes: 15 additions & 0 deletions src/test/java/io/aiven/kafka/connect/gcs/GcsSinkTaskTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import io.aiven.kafka.connect.gcs.testutils.Record;
import io.aiven.kafka.connect.gcs.testutils.Utils;

import com.google.api.gax.rpc.NoHeaderProvider;
import com.google.cloud.storage.Storage;
import com.google.cloud.storage.contrib.nio.testing.LocalStorageHelper;
import com.google.common.collect.Lists;
Expand Down Expand Up @@ -382,6 +383,13 @@ void setupDefaultRetryPolicy() {

verify(mockedContext, never()).timeout(anyLong());

final Map<String, String> headers = storage.getOptions()
.getMergedHeaderProvider(new NoHeaderProvider())
.getHeaders();

assertThat(headers.size()).isEqualTo(1);
assertThat(headers.get("user-agent"))
.isEqualTo("kafka_connect_gcs (GPN: Aiven;) Google BigQuery Sink/test-version");
assertThat(retrySettings.isJittered()).isTrue();
assertThat(retrySettings.getInitialRetryDelay())
.isEqualTo(Duration.ofMillis(GcsSinkConfig.GCS_RETRY_BACKOFF_INITIAL_DELAY_MS_DEFAULT));
Expand Down Expand Up @@ -415,6 +423,13 @@ void setupCustomRetryPolicy() {

verify(mockedContext).timeout(kafkaBackoffMsCaptor.capture());

final Map<String, String> headers = storage.getOptions()
.getMergedHeaderProvider(new NoHeaderProvider())
.getHeaders();

assertThat(headers.size()).isEqualTo(1);
assertThat(headers.get("user-agent"))
.isEqualTo("kafka_connect_gcs (GPN: Aiven;) Google BigQuery Sink/test-version");
assertThat(retrySettings.isJittered()).isTrue();
assertThat(kafkaBackoffMsCaptor.getValue()).isEqualTo(1L);
assertThat(retrySettings.getInitialRetryDelay()).isEqualTo(Duration.ofMillis(2L));
Expand Down

0 comments on commit 0784a43

Please sign in to comment.