diff --git a/LTS-CHANGELOG.adoc b/LTS-CHANGELOG.adoc index 7d7e4320f8..e657906da9 100644 --- a/LTS-CHANGELOG.adoc +++ b/LTS-CHANGELOG.adoc @@ -17,6 +17,13 @@ include::content/docs/variables.adoc-include[] The LTS changelog lists releases which are only accessible via a commercial subscription. All fixes and changes in LTS releases will be released the next minor release. Changes from LTS 1.4.x will be included in release 1.5.0. +[[v1.6.34]] +== 1.6.34 (TBD) + +icon:check[] Java Rest Client: The classes `MeshWebrootResponse` and `MeshWebrootFieldResponse` now extend `Closeable` and calling `close()` on implementations +will close the underlying response. See link:{{< relref "java-rest-client.asciidoc" >}}#_connection_leaks[Gentics Mesh Java Client] for examples of how to properly +close the responses. + [[v1.6.33]] == 1.6.33 (07.09.2022) diff --git a/core/src/main/java/com/gentics/mesh/rest/MeshLocalRequestImpl.java b/core/src/main/java/com/gentics/mesh/rest/MeshLocalRequestImpl.java index 7f7fd71053..ec72f79d29 100644 --- a/core/src/main/java/com/gentics/mesh/rest/MeshLocalRequestImpl.java +++ b/core/src/main/java/com/gentics/mesh/rest/MeshLocalRequestImpl.java @@ -57,6 +57,10 @@ public T getBody() { public List getCookies() { throw new RuntimeException("There are no cookies in local requests"); } + + @Override + public void close() { + } }); } diff --git a/core/src/test/java/com/gentics/mesh/rest/ConnectionLeakTest.java b/core/src/test/java/com/gentics/mesh/rest/ConnectionLeakTest.java new file mode 100644 index 0000000000..8ca032e3f1 --- /dev/null +++ b/core/src/test/java/com/gentics/mesh/rest/ConnectionLeakTest.java @@ -0,0 +1,545 @@ +package com.gentics.mesh.rest; + +import static com.gentics.mesh.test.ClientHelper.call; +import static com.gentics.mesh.test.TestDataProvider.PROJECT_NAME; +import static org.assertj.core.api.Assertions.assertThat; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.util.Arrays; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.commons.io.IOUtils; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; + +import com.gentics.mesh.core.rest.node.NodeCreateRequest; +import com.gentics.mesh.core.rest.node.NodeResponse; +import com.gentics.mesh.core.rest.project.ProjectResponse; +import com.gentics.mesh.core.rest.schema.impl.BinaryFieldSchemaImpl; +import com.gentics.mesh.core.rest.schema.impl.SchemaCreateRequest; +import com.gentics.mesh.core.rest.schema.impl.SchemaResponse; +import com.gentics.mesh.rest.client.MeshBinaryResponse; +import com.gentics.mesh.rest.client.MeshWebrootFieldResponse; +import com.gentics.mesh.rest.client.MeshWebrootResponse; +import com.gentics.mesh.test.ConnectionVerifier; +import com.gentics.mesh.test.TestSize; +import com.gentics.mesh.test.context.AbstractMeshTest; +import com.gentics.mesh.test.context.MeshTestContext; +import com.gentics.mesh.test.context.MeshTestSetting; + +import io.vertx.core.buffer.Buffer; + +/** + * Tests for possible resource leaks when using the MeshRestClient + */ +@MeshTestSetting(testSize = TestSize.FULL, startServer = true) +public class ConnectionLeakTest extends AbstractMeshTest { + public final static String BINARY_SCHEMA_NAME = "binary_schema"; + + public final static String BINARY_FIELD_NAME = "binary"; + + public final static String CONTENT_TYPE = "image/png"; + + public final static String FILENAME = "somefile.png"; + + public final static String WEBROOT_PATH = "/" + FILENAME; + + protected NodeResponse node; + + protected int uploadSize; + + @Rule + public ConnectionVerifier connectionVerifier = new ConnectionVerifier(MeshTestContext.okHttp); + + /** + * Setup everything required for the test + * @throws IOException + */ + @Before + public void setup() throws IOException { + setupBinarySchema(); + setupBinaryData(); + } + + /** + * Setup the binary schema + */ + public void setupBinarySchema() { + SchemaCreateRequest request = new SchemaCreateRequest(); + request.setName(BINARY_SCHEMA_NAME); + request.setFields(Arrays.asList(new BinaryFieldSchemaImpl().setName(BINARY_FIELD_NAME))); + request.setSegmentField(BINARY_FIELD_NAME); + SchemaResponse schemaResponse = client().createSchema(request).blockingGet(); + client().assignSchemaToProject(PROJECT_NAME, schemaResponse.getUuid()).blockingAwait(); + } + + /** + * Prepare the binary data for testing + * @throws IOException + */ + public void setupBinaryData() throws IOException { + ProjectResponse project = call(() -> client().findProjectByName(PROJECT_NAME)); + node = call(() -> client().createNode(PROJECT_NAME, new NodeCreateRequest().setLanguage("en") + .setParentNodeUuid(project.getRootNode().getUuid()).setSchemaName(BINARY_SCHEMA_NAME))); + + InputStream ins = getClass().getResourceAsStream("/pictures/blume.jpg"); + byte[] bytes = IOUtils.toByteArray(ins); + Buffer buffer = Buffer.buffer(bytes); + + node = call(() -> client().updateNodeBinaryField(PROJECT_NAME, node.getUuid(), "en", "draft", BINARY_FIELD_NAME, + new ByteArrayInputStream(buffer.getBytes()), buffer.length(), + FILENAME, CONTENT_TYPE)); + uploadSize = buffer.length(); + } + + // Test cases for node requests (non-blocking) + + /** + * Test loading a node + * @throws InterruptedException + */ + @Test + public void testNode() throws InterruptedException { + CountDownLatch latch = new CountDownLatch(1); + client().findNodeByUuid(PROJECT_NAME, node.getUuid()).toSingle().doFinally(() -> { + latch.countDown(); + }).subscribe(); + assertThat(latch.await(1, TimeUnit.MINUTES)).as("Call ended in time").isTrue(); + } + + /** + * Test error in consumption of response while loading a node + * @throws InterruptedException + */ + @Test + public void testNodeErrorWhileConsuming() throws InterruptedException { + CountDownLatch latch = new CountDownLatch(1); + client().findNodeByUuid(PROJECT_NAME, node.getUuid()).toSingle().doOnSuccess(response -> { + throw new RuntimeException("Something bad happens here"); + }).doFinally(() -> { + latch.countDown(); + }).subscribe(); + assertThat(latch.await(1, TimeUnit.MINUTES)).as("Call ended in time").isTrue(); + } + + // Test cases for node requests (blocking) + + /** + * Test loading a node in blocking manner + * @throws InterruptedException + */ + @Test + public void testNodeBlocking() throws InterruptedException { + client().findNodeByUuid(PROJECT_NAME, node.getUuid()).toSingle().blockingGet(); + } + + // Test cases for binary field (non-blocking) + + /** + * Test normal download of binary field + * @throws IOException + * @throws InterruptedException + */ + @Test + public void testBinaryField() throws IOException, InterruptedException { + CountDownLatch latch = new CountDownLatch(1); + AtomicInteger downloadSize = new AtomicInteger(); + client().downloadBinaryField(PROJECT_NAME, node.getUuid(), "en", BINARY_FIELD_NAME).toSingle() + .flatMapObservable(binResponse -> binResponse.getFlowable().toObservable().doOnComplete(() -> binResponse.close())).doOnNext(bytes -> { + downloadSize.addAndGet(bytes.length); + }).doFinally(() -> { + latch.countDown(); + }).subscribe(); + + assertThat(latch.await(1, TimeUnit.MINUTES)).as("Call ended in time").isTrue(); + assertThat(downloadSize.get()).as("Downloaded bytes").isEqualTo(uploadSize); + } + + /** + * Test requesting but not downloading binary field + * @throws IOException + * @throws InterruptedException + */ + @Test + public void testBinaryFieldNotConsumed() throws IOException, InterruptedException { + CountDownLatch latch = new CountDownLatch(1); + client().downloadBinaryField(PROJECT_NAME, node.getUuid(), "en", BINARY_FIELD_NAME).toSingle() + .doAfterSuccess(response -> { + // note: we explicitly need to close the response here + response.close(); + }).doFinally(() -> { + latch.countDown(); + }).subscribe(); + + assertThat(latch.await(1, TimeUnit.MINUTES)).as("Call ended in time").isTrue(); + } + + /** + * Test error while consuming response for binary field + * @throws IOException + * @throws InterruptedException + */ + @Test + public void testBinaryFieldErrorWhileConsuming() throws IOException, InterruptedException { + CountDownLatch latch = new CountDownLatch(1); + client().downloadBinaryField(PROJECT_NAME, node.getUuid(), "en", BINARY_FIELD_NAME).toSingle().doAfterSuccess(response -> { + // note: we explicitly need to close the response here + response.close(); + }).doOnSuccess(response -> { + throw new RuntimeException("Something bad happens here"); + }).doFinally(() -> { + latch.countDown(); + }).subscribe(); + + assertThat(latch.await(1, TimeUnit.MINUTES)).as("Call ended in time").isTrue(); + } + + /** + * Test error while consuming binary data for binary field + * @throws IOException + * @throws InterruptedException + */ + @Test + public void testBinaryFieldErrorWhileConsumingData() throws IOException, InterruptedException { + CountDownLatch latch = new CountDownLatch(1); + client().downloadBinaryField(PROJECT_NAME, node.getUuid(), "en", BINARY_FIELD_NAME).toSingle() + .flatMapObservable(binResponse -> binResponse.getFlowable().toObservable()).doOnNext(bytes -> { + throw new RuntimeException("Something bad happens here"); + }).doFinally(() -> { + latch.countDown(); + }).subscribe(); + + assertThat(latch.await(1, TimeUnit.MINUTES)).as("Call ended in time").isTrue(); + } + + // test cases for binary field (blocking) + + /** + * Test downloading binary field in a blocking manner + * @throws IOException + * @throws InterruptedException + */ + @Test + public void testBinaryFieldBlocking() throws IOException, InterruptedException { + CountDownLatch latch = new CountDownLatch(1); + AtomicInteger downloadSize = new AtomicInteger(); + MeshBinaryResponse response = client() + .downloadBinaryField(PROJECT_NAME, node.getUuid(), "en", BINARY_FIELD_NAME).toSingle().blockingGet(); + response.getFlowable().doOnNext(bytes -> { + downloadSize.addAndGet(bytes.length); + }).doFinally(() -> { + latch.countDown(); + }).subscribe(); + + assertThat(latch.await(1, TimeUnit.MINUTES)).as("Call ended in time").isTrue(); + assertThat(downloadSize.get()).as("Downloaded bytes").isEqualTo(uploadSize); + } + + /** + * Test requesting but not consuming binary field in a blocking manner + * @throws IOException + * @throws InterruptedException + */ + @Test + public void testBinaryFieldBlockingNotConsumed() throws IOException, InterruptedException { + // not we need to get the response with try-with-resources, so it automatically gets closed + try (MeshBinaryResponse response = client() + .downloadBinaryField(PROJECT_NAME, node.getUuid(), "en", BINARY_FIELD_NAME).toSingle().blockingGet()) { + } + } + + /** + * Test error while downloading binary data from binary field in a blocking manner + * @throws IOException + * @throws InterruptedException + */ + @Test + public void testBinaryFieldBlockingErrorWhileConsumingData() throws IOException, InterruptedException { + CountDownLatch latch = new CountDownLatch(1); + MeshBinaryResponse response = client() + .downloadBinaryField(PROJECT_NAME, node.getUuid(), "en", BINARY_FIELD_NAME).toSingle().blockingGet(); + + response.getFlowable().doOnNext(bytes -> { + throw new RuntimeException("Something bad happens here"); + }).doFinally(() -> { + latch.countDown(); + }).subscribe(); + + assertThat(latch.await(1, TimeUnit.MINUTES)).as("Call ended in time").isTrue(); + } + + // test cases for webroot request (non-blocking) + + /** + * Test normal download of binary data over webroot + * @throws IOException + * @throws InterruptedException + */ + @Test + public void testWebroot() throws IOException, InterruptedException { + CountDownLatch latch = new CountDownLatch(1); + AtomicInteger downloadSize = new AtomicInteger(); + client().webroot(PROJECT_NAME, WEBROOT_PATH).toSingle().doFinally(() -> { + latch.countDown(); + }).subscribe(response -> { + if (response.isBinary()) { + response.getBinaryResponse().getFlowable().doOnNext(bytes -> { + downloadSize.addAndGet(bytes.length); + }).subscribe(); + } + }); + + assertThat(latch.await(1, TimeUnit.MINUTES)).as("Call ended in time").isTrue(); + assertThat(downloadSize.get()).as("Downloaded bytes").isEqualTo(uploadSize); + } + + /** + * Test requesting but not downloading over webroot + * @throws IOException + * @throws InterruptedException + */ + @Test + public void testWebrootNotConsumed() throws IOException, InterruptedException { + CountDownLatch latch = new CountDownLatch(1); + client().webroot(PROJECT_NAME, WEBROOT_PATH).toSingle().doAfterSuccess(response -> { + // note: we explicitly need to close the response here + response.close(); + }).doFinally(() -> { + latch.countDown(); + }).subscribe(); + + assertThat(latch.await(1, TimeUnit.MINUTES)).as("Call ended in time").isTrue(); + } + + /** + * Test error while consuming response when downloading via webroot + * @throws IOException + * @throws InterruptedException + */ + @Test + public void testWebrootErrorWhileConsuming() throws IOException, InterruptedException { + CountDownLatch latch = new CountDownLatch(1); + client().webroot(PROJECT_NAME, WEBROOT_PATH).toSingle().doAfterSuccess(response -> { + // note: we explicitly need to close the response here + response.close(); + }).doOnSuccess(response -> { + throw new RuntimeException("Something bad happens here"); + }).doFinally(() -> { + latch.countDown(); + }).subscribe(); + + assertThat(latch.await(1, TimeUnit.MINUTES)).as("Call ended in time").isTrue(); + } + + /** + * Test error while consuming binary data when downloading via webroot + * @throws IOException + * @throws InterruptedException + */ + @Test + public void testWebrootErrorWhileConsumingData() throws IOException, InterruptedException { + CountDownLatch latch = new CountDownLatch(1); + client().webroot(PROJECT_NAME, WEBROOT_PATH).toSingle().doOnSuccess(response -> { + if (response.isBinary()) { + response.getBinaryResponse().getFlowable().doOnNext(bytes -> { + throw new RuntimeException("Something bad happens here"); + }).subscribe(); + } + }).doFinally(() -> { + latch.countDown(); + }).subscribe(); + + assertThat(latch.await(1, TimeUnit.MINUTES)).as("Call ended in time").isTrue(); + } + + // test cases for webroot request (blocking) + + /** + * Test normal download of binary data over webroot in blocking manner + * @throws IOException + * @throws InterruptedException + */ + @Test + public void testWebrootBlocking() throws IOException, InterruptedException { + AtomicInteger downloadSize = new AtomicInteger(); + + MeshWebrootResponse response = client().webroot(PROJECT_NAME, WEBROOT_PATH).toSingle().blockingGet(); + if (response.isBinary()) { + response.getBinaryResponse().getFlowable().doOnNext(bytes -> { + downloadSize.addAndGet(bytes.length); + }).blockingSubscribe(); + } + assertThat(downloadSize.get()).as("Downloaded bytes").isEqualTo(uploadSize); + } + + /** + * Test requesting but not downloading over webroot in blocking manner + * @throws IOException + * @throws InterruptedException + */ + @Test + public void testWebrootBlockingNotConsumed() throws IOException, InterruptedException { + // not we need to get the response with try-with-resources, so it automatically gets closed + try (MeshWebrootResponse response = client().webroot(PROJECT_NAME, WEBROOT_PATH).toSingle().blockingGet()) { + } + } + + /** + * Test error while consuming binary data when downloading via webroot in blocking manner + * @throws IOException + * @throws InterruptedException + */ + @Test + public void testWebrootBlockingErrorWhileConsumingData() throws IOException, InterruptedException { + MeshWebrootResponse response = client().webroot(PROJECT_NAME, WEBROOT_PATH).toSingle().blockingGet(); + + if (response.isBinary()) { + try { + response.getBinaryResponse().getFlowable().doOnNext(bytes -> { + throw new RuntimeException("Something bad happens here"); + }).blockingSubscribe(); + } catch (RuntimeException ignored) { + } + } + } + + // test cases for webrootfield request (non-blocking) + + /** + * Test normal download of binary data over webrootfield + * @throws IOException + * @throws InterruptedException + */ + @Test + public void testWebrootField() throws IOException, InterruptedException { + CountDownLatch latch = new CountDownLatch(1); + AtomicInteger downloadSize = new AtomicInteger(); + client().webrootField(PROJECT_NAME, BINARY_FIELD_NAME, WEBROOT_PATH).toSingle().doFinally(() -> { + latch.countDown(); + }).subscribe(response -> { + if (response.isBinary()) { + response.getBinaryResponse().getFlowable().doOnNext(bytes -> { + downloadSize.addAndGet(bytes.length); + }).subscribe(); + } + }); + + assertThat(latch.await(1, TimeUnit.MINUTES)).as("Call ended in time").isTrue(); + assertThat(downloadSize.get()).as("Downloaded bytes").isEqualTo(uploadSize); + } + + /** + * Test requesting but not downloading over webrootfield + * @throws IOException + * @throws InterruptedException + */ + @Test + public void testWebrootFieldNotConsumed() throws IOException, InterruptedException { + CountDownLatch latch = new CountDownLatch(1); + client().webrootField(PROJECT_NAME, BINARY_FIELD_NAME, WEBROOT_PATH).toSingle().doAfterSuccess(response -> { + // note: we explicitly need to close the response here + response.close(); + }).doFinally(() -> { + latch.countDown(); + }).subscribe(); + + assertThat(latch.await(1, TimeUnit.MINUTES)).as("Call ended in time").isTrue(); + } + + /** + * Test error while consuming response when downloading via webrootfield + * @throws IOException + * @throws InterruptedException + */ + @Test + public void testWebrootFieldErrorWhileConsuming() throws IOException, InterruptedException { + CountDownLatch latch = new CountDownLatch(1); + client().webrootField(PROJECT_NAME, BINARY_FIELD_NAME, WEBROOT_PATH).toSingle().doAfterSuccess(response -> { + // note: we explicitly need to close the response here + response.close(); + }).doOnSuccess(response -> { + throw new RuntimeException("Something bad happens here"); + }).doFinally(() -> { + latch.countDown(); + }).subscribe(); + + assertThat(latch.await(1, TimeUnit.MINUTES)).as("Call ended in time").isTrue(); + } + + /** + * Test error while consuming binary data when downloading via webrootfield + * @throws IOException + * @throws InterruptedException + */ + @Test + public void testWebrootFieldErrorWhileConsumingData() throws IOException, InterruptedException { + CountDownLatch latch = new CountDownLatch(1); + client().webrootField(PROJECT_NAME, BINARY_FIELD_NAME, WEBROOT_PATH).toSingle().doOnSuccess(response -> { + if (response.isBinary()) { + response.getBinaryResponse().getFlowable().doOnNext(bytes -> { + throw new RuntimeException("Something bad happens here"); + }).subscribe(); + } + }).doFinally(() -> { + latch.countDown(); + }).subscribe(); + + assertThat(latch.await(1, TimeUnit.MINUTES)).as("Call ended in time").isTrue(); + } + + // test cases for webrootfield request (blocking) + + /** + * Test normal download of binary data over webrootfield in blocking manner + * @throws IOException + * @throws InterruptedException + */ + @Test + public void testWebrootFieldBlocking() throws IOException, InterruptedException { + AtomicInteger downloadSize = new AtomicInteger(); + + MeshWebrootFieldResponse response = client().webrootField(PROJECT_NAME, BINARY_FIELD_NAME, WEBROOT_PATH).toSingle().blockingGet(); + if (response.isBinary()) { + response.getBinaryResponse().getFlowable().doOnNext(bytes -> { + downloadSize.addAndGet(bytes.length); + }).blockingSubscribe(); + } + assertThat(downloadSize.get()).as("Downloaded bytes").isEqualTo(uploadSize); + } + + /** + * Test requesting but not downloading over webrootfield in blocking manner + * @throws IOException + * @throws InterruptedException + */ + @Test + public void testWebrootFieldBlockingNotConsumed() throws IOException, InterruptedException { + // not we need to get the response with try-with-resources, so it automatically gets closed + try (MeshWebrootFieldResponse response = client().webrootField(PROJECT_NAME, BINARY_FIELD_NAME, WEBROOT_PATH).toSingle().blockingGet()) { + } + } + + /** + * Test error while consuming binary data when downloading via webrootfield in blocking manner + * @throws IOException + * @throws InterruptedException + */ + @Test + public void testWebrootFieldBlockingErrorWhileConsumingData() throws IOException, InterruptedException { + MeshWebrootFieldResponse response = client().webrootField(PROJECT_NAME, BINARY_FIELD_NAME, WEBROOT_PATH).toSingle().blockingGet(); + + if (response.isBinary()) { + try { + response.getBinaryResponse().getFlowable().doOnNext(bytes -> { + throw new RuntimeException("Something bad happens here"); + }).blockingSubscribe(); + } catch (RuntimeException ignored) { + } + } + } +} diff --git a/doc/src/main/docs/java-rest-client.asciidoc b/doc/src/main/docs/java-rest-client.asciidoc index 2518bea36d..cca0ca8101 100644 --- a/doc/src/main/docs/java-rest-client.asciidoc +++ b/doc/src/main/docs/java-rest-client.asciidoc @@ -96,6 +96,48 @@ Method to set the hostname verification checks: * `MeshRestClientConfig.Builder#setHostnameVerification(boolean flag)` +=== Connection Leaks + +In most cases, the connection to Mesh will implicitly be closed when the response object is created. The only exceptions are when loading binary data or doing a webroot or webrootField request (where the response might contain binary data). + +For those requests, it is important to make sure, that the response is properly closed in all cases, especially when the response data is not (fully) consumed (e.g. when an error happens while consuming the binary data). + +Incorrect example: +[source,java] +---- +client.downloadBinaryField("demo", "01ecd6048ee21471bb90af6deea40d2c", "en", "image") + .toSingle() + .doOnSuccess(response -> { + // an exception might be thrown here, which would leave the response open + }) + .subscribe(); +---- + +Better example: +[source,java] +---- +client.downloadBinaryField("demo", "01ecd6048ee21471bb90af6deea40d2c", "en", "image") + .toSingle() + .doAfterSuccess(response -> { + // we explicitly need to close the response here + response.close(); + }) + .doOnSuccess(response -> { + // an exception might be thrown here + }) + .subscribe(); +---- + +Example with blocking code: +[source,java] +---- +// try-with-resource will make sure, that close() is called on the response +try (MeshBinaryResponse response = client.downloadBinaryField("demo", "01ecd6048ee21471bb90af6deea40d2c", "en", "image") + .toSingle().blockingGet()) { + // an exception might be thrown here +} +---- + == Monitoring Client The monitoring client can be used to interact with the link:{{< relref "monitoring.asciidoc" >}}#_endpoints[Monitoring Endpoints]. diff --git a/rest-client/src/main/java/com/gentics/mesh/rest/client/MeshBinaryResponse.java b/rest-client/src/main/java/com/gentics/mesh/rest/client/MeshBinaryResponse.java index 014d416129..66dd75d31f 100644 --- a/rest-client/src/main/java/com/gentics/mesh/rest/client/MeshBinaryResponse.java +++ b/rest-client/src/main/java/com/gentics/mesh/rest/client/MeshBinaryResponse.java @@ -23,7 +23,7 @@ public interface MeshBinaryResponse extends Closeable { * @return */ default Flowable getFlowable() { - return Flowable.defer(() -> { + Flowable f = Flowable.defer(() -> { InputStream stream = getStream(); return Flowable.generate(emitter -> { byte[] buffer = new byte[FLOWABLE_BUFFER_SIZE]; @@ -39,6 +39,8 @@ default Flowable getFlowable() { } }); }); + + return f.doFinally(() -> close()); } /** diff --git a/rest-client/src/main/java/com/gentics/mesh/rest/client/MeshResponse.java b/rest-client/src/main/java/com/gentics/mesh/rest/client/MeshResponse.java index 4ffe667112..0bff821126 100644 --- a/rest-client/src/main/java/com/gentics/mesh/rest/client/MeshResponse.java +++ b/rest-client/src/main/java/com/gentics/mesh/rest/client/MeshResponse.java @@ -1,11 +1,19 @@ package com.gentics.mesh.rest.client; +import java.io.Closeable; import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Optional; -public interface MeshResponse { +/** + * Common interface for Gentics Mesh REST Client responses. + * + * @param + * Response type + * @implNote It is important to close the response by calling {@link #close()} when the response is no longer needed. Failing to do so might lead to a connection leak. + */ +public interface MeshResponse extends Closeable { /** * Retrieve the response headers * @return A map of all response headers @@ -62,4 +70,7 @@ default List getCookies() { * @return The body as the specified type */ T getBody(); + + @Override + void close(); } diff --git a/rest-client/src/main/java/com/gentics/mesh/rest/client/MeshWebrootFieldResponse.java b/rest-client/src/main/java/com/gentics/mesh/rest/client/MeshWebrootFieldResponse.java index 4d185a3197..c29257f219 100644 --- a/rest-client/src/main/java/com/gentics/mesh/rest/client/MeshWebrootFieldResponse.java +++ b/rest-client/src/main/java/com/gentics/mesh/rest/client/MeshWebrootFieldResponse.java @@ -1,12 +1,15 @@ package com.gentics.mesh.rest.client; +import java.io.Closeable; + /** * Response for {@link WebRootFieldClientMethods} * * @author plyhun * + * @implNote It is important to close the response by calling {@link #close()} when the response is no longer needed. Failing to do so might lead to a connection leak. */ -public interface MeshWebrootFieldResponse { +public interface MeshWebrootFieldResponse extends Closeable { /** * Tests if the response is binary data. @@ -63,4 +66,7 @@ public interface MeshWebrootFieldResponse { * @return */ String getResponseAsPlainText(); + + @Override + void close(); } diff --git a/rest-client/src/main/java/com/gentics/mesh/rest/client/MeshWebrootResponse.java b/rest-client/src/main/java/com/gentics/mesh/rest/client/MeshWebrootResponse.java index c7cd3c2fb9..33e515f501 100644 --- a/rest-client/src/main/java/com/gentics/mesh/rest/client/MeshWebrootResponse.java +++ b/rest-client/src/main/java/com/gentics/mesh/rest/client/MeshWebrootResponse.java @@ -1,8 +1,15 @@ package com.gentics.mesh.rest.client; +import java.io.Closeable; + import com.gentics.mesh.core.rest.node.NodeResponse; -public interface MeshWebrootResponse { +/** + * Definition of a WebRootResponse. The webroot response is special since it can return JSON of the {@link NodeResponse} or a {@link MeshBinaryResponse} of the + * binary data of a binary field. This behaviour is controlled by the Accept header of the client request and the queried node (e.g. whether it uses a binary field for the segment). + * @implNote It is important to close the response by calling {@link #close()} when the response is no longer needed. Failing to do so might lead to a connection leak. + */ +public interface MeshWebrootResponse extends Closeable { /** * Tests if the response is binary data. @@ -31,4 +38,7 @@ public interface MeshWebrootResponse { * @return */ String getNodeUuid(); + + @Override + void close(); } diff --git a/rest-client/src/main/java/com/gentics/mesh/rest/client/impl/MeshOkHttpRequestImpl.java b/rest-client/src/main/java/com/gentics/mesh/rest/client/impl/MeshOkHttpRequestImpl.java index edb747e581..f1215f0e53 100644 --- a/rest-client/src/main/java/com/gentics/mesh/rest/client/impl/MeshOkHttpRequestImpl.java +++ b/rest-client/src/main/java/com/gentics/mesh/rest/client/impl/MeshOkHttpRequestImpl.java @@ -292,6 +292,11 @@ public String getBodyAsString() { public T getBody() { return body.get(); } + + @Override + public void close() { + Optional.ofNullable(response).map(Response::body).ifPresent(ResponseBody::close); + } }); } diff --git a/rest-client/src/main/java/com/gentics/mesh/rest/client/impl/OkHttpWebrootFieldResponse.java b/rest-client/src/main/java/com/gentics/mesh/rest/client/impl/OkHttpWebrootFieldResponse.java index 5bf8bb1406..5722fc0e3d 100644 --- a/rest-client/src/main/java/com/gentics/mesh/rest/client/impl/OkHttpWebrootFieldResponse.java +++ b/rest-client/src/main/java/com/gentics/mesh/rest/client/impl/OkHttpWebrootFieldResponse.java @@ -1,12 +1,13 @@ package com.gentics.mesh.rest.client.impl; +import static com.gentics.mesh.http.HttpConstants.TEXT_HTML_UTF8; +import static com.gentics.mesh.http.HttpConstants.TEXT_PLAIN_UTF8; import static com.gentics.mesh.http.MeshHeaders.WEBROOT_NODE_UUID; import static com.gentics.mesh.http.MeshHeaders.WEBROOT_RESPONSE_TYPE; -import static io.vertx.core.http.HttpHeaders.CONTENT_TYPE; -import static com.gentics.mesh.http.HttpConstants.TEXT_PLAIN_UTF8; -import static com.gentics.mesh.http.HttpConstants.TEXT_HTML_UTF8; import static com.gentics.mesh.rest.client.impl.Util.lazily; +import static io.vertx.core.http.HttpHeaders.CONTENT_TYPE; +import java.util.Optional; import java.util.function.Supplier; import org.apache.commons.lang.StringUtils; @@ -17,6 +18,7 @@ import okhttp3.OkHttpClient; import okhttp3.Response; +import okhttp3.ResponseBody; /** * {@link OkHttpClient} implementation of {@link MeshWebrootFieldResponse}. @@ -87,5 +89,10 @@ public String getResponseAsPlainText() { return null; } return jsonStringResponse.get(); - } + } + + @Override + public void close() { + Optional.ofNullable(response).map(Response::body).ifPresent(ResponseBody::close); + } } diff --git a/rest-client/src/main/java/com/gentics/mesh/rest/client/impl/OkHttpWebrootResponse.java b/rest-client/src/main/java/com/gentics/mesh/rest/client/impl/OkHttpWebrootResponse.java index 5634995f6b..2692ee502d 100644 --- a/rest-client/src/main/java/com/gentics/mesh/rest/client/impl/OkHttpWebrootResponse.java +++ b/rest-client/src/main/java/com/gentics/mesh/rest/client/impl/OkHttpWebrootResponse.java @@ -1,19 +1,21 @@ package com.gentics.mesh.rest.client.impl; -import com.gentics.mesh.core.rest.node.NodeResponse; - -import com.gentics.mesh.json.JsonUtil; -import com.gentics.mesh.rest.client.MeshBinaryResponse; -import com.gentics.mesh.rest.client.MeshWebrootResponse; -import okhttp3.Response; +import static com.gentics.mesh.http.MeshHeaders.WEBROOT_NODE_UUID; +import static com.gentics.mesh.http.MeshHeaders.WEBROOT_RESPONSE_TYPE; +import static com.gentics.mesh.rest.client.impl.Util.lazily; +import java.util.Optional; import java.util.function.Supplier; import org.apache.commons.lang.StringUtils; -import static com.gentics.mesh.http.MeshHeaders.WEBROOT_RESPONSE_TYPE; -import static com.gentics.mesh.http.MeshHeaders.WEBROOT_NODE_UUID; -import static com.gentics.mesh.rest.client.impl.Util.lazily; +import com.gentics.mesh.core.rest.node.NodeResponse; +import com.gentics.mesh.json.JsonUtil; +import com.gentics.mesh.rest.client.MeshBinaryResponse; +import com.gentics.mesh.rest.client.MeshWebrootResponse; + +import okhttp3.Response; +import okhttp3.ResponseBody; public class OkHttpWebrootResponse implements MeshWebrootResponse { @@ -52,4 +54,9 @@ public NodeResponse getNodeResponse() { ? null : nodeResponse.get(); } + + @Override + public void close() { + Optional.ofNullable(response).map(Response::body).ifPresent(ResponseBody::close); + } } diff --git a/test-common/src/main/java/com/gentics/mesh/test/ConnectionVerifier.java b/test-common/src/main/java/com/gentics/mesh/test/ConnectionVerifier.java new file mode 100644 index 0000000000..37a855f0a4 --- /dev/null +++ b/test-common/src/main/java/com/gentics/mesh/test/ConnectionVerifier.java @@ -0,0 +1,95 @@ +package com.gentics.mesh.test; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import org.junit.rules.Verifier; + +import io.reactivex.Flowable; +import io.reactivex.disposables.Disposable; +import okhttp3.ConnectionPool; +import okhttp3.OkHttpClient; + +/** + * Testrule, that verifies that for the given OkHttpClient, the number of used + * connections in the pool do not change between creation of the instance (at + * start of the test) and end of the test + */ +public class ConnectionVerifier extends Verifier { + /** + * Connection pool to check + */ + protected ConnectionPool connectionPool; + + /** + * Number of used connections at creation time + */ + protected int usedConnectionsBeforeTest = 0; + + /** + * Timeout for waiting for the connection to be freed + */ + protected long timeout = 10; + + /** + * Unit for the waiting timeout + */ + protected TimeUnit timeoutUnit = TimeUnit.SECONDS; + + /** + * Create an instance for the given client + * @param client client + */ + public ConnectionVerifier(OkHttpClient client) { + this.connectionPool = client.connectionPool(); + usedConnectionsBeforeTest = connectionPool.connectionCount() - connectionPool.idleConnectionCount(); + } + + /** + * Set the timeout + * @param timeout maximum time to wait + * @param timeoutUnit unit + * @return fluent API + */ + public ConnectionVerifier withTimeout(long timeout, TimeUnit timeoutUnit) { + this.timeout = timeout; + this.timeoutUnit = timeoutUnit; + return this; + } + + @Override + protected void verify() throws Throwable { + waitForConnectionFreed(); + assertThat(getConnectionsUsedByTest()).as("Connections used (and not ended) by test").isEqualTo(0); + } + + /** + * Wait for the connection used by the test to become free + * @throws InterruptedException + */ + protected void waitForConnectionFreed() throws InterruptedException { + // now wait for the connection to be freed + CountDownLatch waitLatch = new CountDownLatch(1); + Disposable disp = Flowable.interval(100, TimeUnit.MILLISECONDS).forEach(ignore -> { + if (getConnectionsUsedByTest() == 0) { + waitLatch.countDown(); + } + }); + try { + waitLatch.await(timeout, timeoutUnit); + } finally { + disp.dispose(); + } + } + + /** + * Determine the number of connections used by the test + * @return number of used connections + */ + protected int getConnectionsUsedByTest() { + int usedConnectionsAfterTest = connectionPool.connectionCount() - connectionPool.idleConnectionCount(); + return usedConnectionsAfterTest - usedConnectionsBeforeTest; + } +}