From b9ef784c55af559b1323eaa667f699df3c30e42a Mon Sep 17 00:00:00 2001 From: Eugeniu Date: Thu, 5 Oct 2023 14:11:54 +0100 Subject: [PATCH 1/4] Add Http Method configuration. Tto be used for sending data to target endpoint, Defaulted on POST Method. Supported POST/PUT --- README.md | 2 +- docs/sink-connector-config-options.rst | 8 +++ .../connect/http/config/HttpMethodsType.java | 45 ++++++++++++++++ .../connect/http/config/HttpSinkConfig.java | 40 +++++++++++++++ .../http/sender/AbstractHttpSender.java | 17 ++++++- .../http/sender/DefaultHttpSenderTest.java | 51 ++++++++++++++++++- 6 files changed, 159 insertions(+), 4 deletions(-) create mode 100644 src/main/java/io/aiven/kafka/connect/http/config/HttpMethodsType.java diff --git a/README.md b/README.md index 9bef204..474f273 100644 --- a/README.md +++ b/README.md @@ -10,7 +10,7 @@ We consider the connector **stable** despite the major version is currently 0. ## How it works -The connector uses the POST HTTP method to deliver records. +As default the connector uses the POST HTTP method to deliver records. The connector supports: - authorization (static, OAuth2); diff --git a/docs/sink-connector-config-options.rst b/docs/sink-connector-config-options.rst index baefe5e..aa4c885 100644 --- a/docs/sink-connector-config-options.rst +++ b/docs/sink-connector-config-options.rst @@ -12,6 +12,14 @@ Connection * Valid Values: HTTP(S) URL * Importance: high +``http.method`` + The HTTP Method to use when send the data. + + * Type: string + * Default: "POST" + * Valid Values: [POST, PUT] + * Importance: low + ``http.authorization.type`` The HTTP authorization type. diff --git a/src/main/java/io/aiven/kafka/connect/http/config/HttpMethodsType.java b/src/main/java/io/aiven/kafka/connect/http/config/HttpMethodsType.java new file mode 100644 index 0000000..4172be0 --- /dev/null +++ b/src/main/java/io/aiven/kafka/connect/http/config/HttpMethodsType.java @@ -0,0 +1,45 @@ +/* + * Copyright 2019 Aiven Oy and http-connector-for-apache-kafka project contributors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.aiven.kafka.connect.http.config; + +import java.util.Arrays; +import java.util.Collection; +import java.util.Objects; +import java.util.stream.Collectors; + +public enum HttpMethodsType +{ + POST("POST"), + PUT("PUT"); + + public final String name; + + HttpMethodsType(final String name) { + this.name = name; + } + + public static HttpMethodsType forName(final String name) { + Objects.requireNonNull(name); + return Arrays.stream(values()) + .filter(v -> v.name.equalsIgnoreCase(name)) + .findFirst() + .orElseThrow(() -> new IllegalArgumentException("HTTP Method type: " + name)); + } + + public static final Collection NAMES = + Arrays.stream(values()).map(v -> v.name).collect(Collectors.toList()); +} diff --git a/src/main/java/io/aiven/kafka/connect/http/config/HttpSinkConfig.java b/src/main/java/io/aiven/kafka/connect/http/config/HttpSinkConfig.java index 8cd2657..b1864fa 100644 --- a/src/main/java/io/aiven/kafka/connect/http/config/HttpSinkConfig.java +++ b/src/main/java/io/aiven/kafka/connect/http/config/HttpSinkConfig.java @@ -37,6 +37,7 @@ public class HttpSinkConfig extends AbstractConfig { private static final String CONNECTION_GROUP = "Connection"; private static final String HTTP_URL_CONFIG = "http.url"; + private static final String HTTP_METHOD = "http.method"; private static final String HTTP_AUTHORIZATION_TYPE_CONFIG = "http.authorization.type"; private static final String HTTP_HEADERS_AUTHORIZATION_CONFIG = "http.headers.authorization"; @@ -314,6 +315,41 @@ public String toString() { List.of(OAUTH2_ACCESS_TOKEN_URL_CONFIG, OAUTH2_CLIENT_ID_CONFIG, OAUTH2_CLIENT_SECRET_CONFIG, OAUTH2_CLIENT_AUTHORIZATION_MODE_CONFIG, OAUTH2_CLIENT_SCOPE_CONFIG) ); + + configDef.define( + HTTP_METHOD, + ConfigDef.Type.STRING, + "POST", + new ConfigDef.Validator() { + @Override + @SuppressFBWarnings("NP_LOAD_OF_KNOWN_NULL_VALUE") // Suppress the ConfigException with null value. + public void ensureValid(final String name, final Object value) { + if (value == null) { + throw new ConfigException(HTTP_METHOD, value); + } + assert value instanceof String; + final String valueStr = (String) value; + if (!HttpMethodsType.NAMES.contains(valueStr)) { + throw new ConfigException( + HTTP_METHOD, valueStr, + "supported values are: " + HttpMethodsType.NAMES); + } + } + + @Override + public String toString() { + return HttpMethodsType.NAMES.toString(); + } + }, + ConfigDef.Importance.LOW, + "The HTTP Method to use when send the data.", + CONNECTION_GROUP, + groupCounter++, + ConfigDef.Width.SHORT, + HTTP_METHOD, + FixedSetRecommender.ofSupportedValues(HttpMethodsType.NAMES) + ); + } private static void addBatchingConfigGroup(final ConfigDef configDef) { @@ -548,6 +584,10 @@ public final URI httpUri() { return toURI(HTTP_URL_CONFIG); } + public final HttpMethodsType httpMethod() { + return HttpMethodsType.valueOf(getString(HTTP_METHOD)); + } + public final Long kafkaRetryBackoffMs() { return getLong(KAFKA_RETRY_BACKOFF_MS_CONFIG); } diff --git a/src/main/java/io/aiven/kafka/connect/http/sender/AbstractHttpSender.java b/src/main/java/io/aiven/kafka/connect/http/sender/AbstractHttpSender.java index 0fa5f52..b84463e 100644 --- a/src/main/java/io/aiven/kafka/connect/http/sender/AbstractHttpSender.java +++ b/src/main/java/io/aiven/kafka/connect/http/sender/AbstractHttpSender.java @@ -48,12 +48,25 @@ protected AbstractHttpSender( } public final HttpResponse send(final String body) { - final var requestBuilder = - httpRequestBuilder.build(config).POST(HttpRequest.BodyPublishers.ofString(body)); + final var requestBuilder = prepareRequest(body); return sendWithRetries(requestBuilder, HttpResponseHandler.ON_HTTP_ERROR_RESPONSE_HANDLER, config.maxRetries()); } + // seth http bethod based on config + private Builder prepareRequest(final String body) { + switch (config.httpMethod()) { + case POST: + return httpRequestBuilder + .build(config).POST(HttpRequest.BodyPublishers.ofString(body)); + case PUT: + return httpRequestBuilder + .build(config).PUT(HttpRequest.BodyPublishers.ofString(body)); + default: + throw new ConnectException("Unsupported HTTP method: " + config.httpMethod()); + } + } + /** * Sends an HTTP body using {@code httpSender}, respecting the configured retry policy. * diff --git a/src/test/java/io/aiven/kafka/connect/http/sender/DefaultHttpSenderTest.java b/src/test/java/io/aiven/kafka/connect/http/sender/DefaultHttpSenderTest.java index e666d4d..9d72d15 100644 --- a/src/test/java/io/aiven/kafka/connect/http/sender/DefaultHttpSenderTest.java +++ b/src/test/java/io/aiven/kafka/connect/http/sender/DefaultHttpSenderTest.java @@ -24,6 +24,7 @@ import java.util.List; import java.util.Map; +import io.aiven.kafka.connect.http.config.HttpMethodsType; import org.apache.kafka.connect.errors.ConnectException; import io.aiven.kafka.connect.http.config.HttpSinkConfig; @@ -88,7 +89,7 @@ void shouldBuildDefaultHttpRequest() throws Exception { .isPresent() .get(as(InstanceOfAssertFactories.DURATION)) .hasSeconds(config.httpTimeout()); - assertThat(httpRequest.method()).isEqualTo("POST"); + assertThat(httpRequest.method()).isEqualTo(HttpMethodsType.POST.name()); assertThat(httpRequest .headers() @@ -101,6 +102,54 @@ void shouldBuildDefaultHttpRequest() throws Exception { } + @Test + void shouldBuildDefaultHttpPutRequest() throws Exception { + final var configBase = new HashMap<>(defaultConfig()); + configBase.put( "http.method", "PUT"); + + // Build the configuration + final HttpSinkConfig config = new HttpSinkConfig(configBase); + + // Mock the Http Client and Http Response + when(mockedClient.send(any(HttpRequest.class), any(BodyHandler.class))).thenReturn(mockedResponse); + + // Create a spy on the HttpSender implementation to capture methods parameters + final var httpSender = Mockito.spy(new DefaultHttpSender(config, mockedClient)); + + // Trigger the client + final List messages = List.of("some message"); + messages.forEach(httpSender::send); + + // Capture the RequestBuilder + final ArgumentCaptor defaultHttpRequestBuilder = ArgumentCaptor.forClass(HttpRequest.Builder.class); + verify(httpSender, atLeast(messages.size())).sendWithRetries(defaultHttpRequestBuilder.capture(), + any(HttpResponseHandler.class), anyInt()); + + // Retrieve the builders and rebuild the HttpRequests to check the HttpRequest proper configuration + defaultHttpRequestBuilder + .getAllValues() + .stream() + .map(Builder::build) + .forEach(httpRequest -> { + assertThat(httpRequest.uri()).isEqualTo(config.httpUri()); + assertThat(httpRequest.timeout()) + .isPresent() + .get(as(InstanceOfAssertFactories.DURATION)) + .hasSeconds(config.httpTimeout()); + assertThat(httpRequest.method()).isEqualTo(HttpMethodsType.PUT.name()); + + assertThat(httpRequest + .headers() + .firstValue(HttpRequestBuilder.HEADER_CONTENT_TYPE)).isEmpty(); + }); + + // Check the messages have been sent once + messages.forEach( + message -> bodyPublishers.verify(() -> HttpRequest.BodyPublishers.ofString(eq(message)), times(1))); + + } + + @Test void shouldBuildCustomHttpRequest() throws Exception { final var configBase = new HashMap<>(defaultConfig()); From 317dfe5e26c306a3e214e75459a02a878599cb6c Mon Sep 17 00:00:00 2001 From: Cararus Eugeniu Date: Mon, 30 Oct 2023 12:56:27 +0000 Subject: [PATCH 2/4] Update HttpMethodsType.java Fix linter reported issue --- .../io/aiven/kafka/connect/http/config/HttpMethodsType.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/main/java/io/aiven/kafka/connect/http/config/HttpMethodsType.java b/src/main/java/io/aiven/kafka/connect/http/config/HttpMethodsType.java index 4172be0..f0a5ad6 100644 --- a/src/main/java/io/aiven/kafka/connect/http/config/HttpMethodsType.java +++ b/src/main/java/io/aiven/kafka/connect/http/config/HttpMethodsType.java @@ -21,8 +21,7 @@ import java.util.Objects; import java.util.stream.Collectors; -public enum HttpMethodsType -{ +public enum HttpMethodsType { POST("POST"), PUT("PUT"); From 23c601a8fa2322d5613202b900bda4a5a418bda6 Mon Sep 17 00:00:00 2001 From: Cararus Eugeniu Date: Mon, 30 Oct 2023 12:59:44 +0000 Subject: [PATCH 3/4] Update DefaultHttpSenderTest.java Fix Linter reported issue --- .../kafka/connect/http/sender/DefaultHttpSenderTest.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/test/java/io/aiven/kafka/connect/http/sender/DefaultHttpSenderTest.java b/src/test/java/io/aiven/kafka/connect/http/sender/DefaultHttpSenderTest.java index 9d72d15..6ea5db7 100644 --- a/src/test/java/io/aiven/kafka/connect/http/sender/DefaultHttpSenderTest.java +++ b/src/test/java/io/aiven/kafka/connect/http/sender/DefaultHttpSenderTest.java @@ -25,10 +25,10 @@ import java.util.Map; import io.aiven.kafka.connect.http.config.HttpMethodsType; -import org.apache.kafka.connect.errors.ConnectException; import io.aiven.kafka.connect.http.config.HttpSinkConfig; +import org.apache.kafka.connect.errors.ConnectException; import org.assertj.core.api.InstanceOfAssertFactories; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; @@ -105,7 +105,7 @@ void shouldBuildDefaultHttpRequest() throws Exception { @Test void shouldBuildDefaultHttpPutRequest() throws Exception { final var configBase = new HashMap<>(defaultConfig()); - configBase.put( "http.method", "PUT"); + configBase.put("http.method", "PUT"); // Build the configuration final HttpSinkConfig config = new HttpSinkConfig(configBase); From b7f1375767c90005158fad642a79ea8d3cd37098 Mon Sep 17 00:00:00 2001 From: Cararus Eugeniu Date: Tue, 31 Oct 2023 09:47:40 +0000 Subject: [PATCH 4/4] Update DefaultHttpSenderTest.java Fix Linter reported issue --- .../aiven/kafka/connect/http/sender/DefaultHttpSenderTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/test/java/io/aiven/kafka/connect/http/sender/DefaultHttpSenderTest.java b/src/test/java/io/aiven/kafka/connect/http/sender/DefaultHttpSenderTest.java index 6ea5db7..b5ddf33 100644 --- a/src/test/java/io/aiven/kafka/connect/http/sender/DefaultHttpSenderTest.java +++ b/src/test/java/io/aiven/kafka/connect/http/sender/DefaultHttpSenderTest.java @@ -25,10 +25,10 @@ import java.util.Map; import io.aiven.kafka.connect.http.config.HttpMethodsType; - import io.aiven.kafka.connect.http.config.HttpSinkConfig; import org.apache.kafka.connect.errors.ConnectException; + import org.assertj.core.api.InstanceOfAssertFactories; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith;