diff --git a/jetty-core/jetty-client/src/main/java/org/eclipse/jetty/client/Response.java b/jetty-core/jetty-client/src/main/java/org/eclipse/jetty/client/Response.java index 10b0db2a60ea..c07188acd0fa 100644 --- a/jetty-core/jetty-client/src/main/java/org/eclipse/jetty/client/Response.java +++ b/jetty-core/jetty-client/src/main/java/org/eclipse/jetty/client/Response.java @@ -147,24 +147,19 @@ interface ContentListener extends AsyncContentListener * Callback method invoked when the response content has been received, parsed and there is demand. * This method may be invoked multiple times, and the {@code content} buffer * must be consumed (or copied) before returning from this method. + * This method is also always invoked when content arrives as demand is automatically registered on return. * * @param response the response containing the response line data and the headers * @param content the content bytes received + * @throws Exception an uncaught exception will abort the response and eventually reclaim the ByteBuffer, if applicable */ - void onContent(Response response, ByteBuffer content); + void onContent(Response response, ByteBuffer content) throws Exception; @Override - default void onContent(Response response, Content.Chunk chunk, Runnable demander) + default void onContent(Response response, Content.Chunk chunk, Runnable demander) throws Exception { - try - { - onContent(response, chunk.getByteBuffer()); - demander.run(); - } - catch (Throwable x) - { - response.abort(x); - } + onContent(response, chunk.getByteBuffer()); + demander.run(); } } @@ -175,7 +170,19 @@ default void onContent(Response response, Content.Chunk chunk, Runnable demander */ interface AsyncContentListener extends ContentSourceListener { - void onContent(Response response, Content.Chunk chunk, Runnable demander); + /** + * Callback method invoked when the response content has been received, parsed and there is demand. + * The {@code chunk} must be consumed, copied, or retained before returning from this method as + * it is then automatically released. + * The {@code demander} must be run before this method may be invoked again. + * + * @param response the response containing the response line data and the headers + * @param chunk the chunk received + * @param demander the runnable to be run to demand the next chunk + * @throws Exception an uncaught exception will abort the response, release the chunk and fail the content source + * from which the chunk was read from + */ + void onContent(Response response, Content.Chunk chunk, Runnable demander) throws Exception; @Override default void onContentSource(Response response, Content.Source contentSource) diff --git a/jetty-core/jetty-client/src/test/java/org/eclipse/jetty/client/AsyncContentListenerTest.java b/jetty-core/jetty-client/src/test/java/org/eclipse/jetty/client/AsyncContentListenerTest.java index 01d302fa4754..1f74440ade43 100644 --- a/jetty-core/jetty-client/src/test/java/org/eclipse/jetty/client/AsyncContentListenerTest.java +++ b/jetty-core/jetty-client/src/test/java/org/eclipse/jetty/client/AsyncContentListenerTest.java @@ -33,6 +33,31 @@ public class AsyncContentListenerTest { + @Test + public void testOnContentThrowingException() + { + TestSource originalSource = new TestSource( + Content.Chunk.from(ByteBuffer.wrap(new byte[] {1}), false), + Content.Chunk.from(ByteBuffer.wrap(new byte[] {2}), false), + Content.Chunk.from(ByteBuffer.wrap(new byte[] {3}), true) + ); + Response.AsyncContentListener asyncContentListener = (response, chunk, demander) -> + { + throw new NumberFormatException(); + }; + + HttpResponse response = new HttpResponse(new HttpRequest(new HttpClient(), new HttpConversation(), URI.create("http://localhost"))); + asyncContentListener.onContentSource(response, originalSource); + + // Assert that the source was failed. + Content.Chunk lastChunk = originalSource.read(); + assertThat(Content.Chunk.isFailure(lastChunk, true), is(true)); + assertThat(lastChunk.getFailure(), instanceOf(NumberFormatException.class)); + + // Assert that the response was aborted. + assertThat(response.getRequest().getAbortCause(), instanceOf(NumberFormatException.class)); + } + @Test public void testTransientFailureBecomesTerminal() { @@ -70,7 +95,7 @@ public void testTransientFailureBecomesTerminal() originalSource.close(); } - private static class TestSource extends ChunksContentSource implements Closeable + static class TestSource extends ChunksContentSource implements Closeable { private Content.Chunk[] chunks; diff --git a/jetty-core/jetty-client/src/test/java/org/eclipse/jetty/client/ContentListenerTest.java b/jetty-core/jetty-client/src/test/java/org/eclipse/jetty/client/ContentListenerTest.java new file mode 100644 index 000000000000..c579f7250112 --- /dev/null +++ b/jetty-core/jetty-client/src/test/java/org/eclipse/jetty/client/ContentListenerTest.java @@ -0,0 +1,56 @@ +// +// ======================================================================== +// Copyright (c) 1995 Mort Bay Consulting Pty Ltd and others. +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License v. 2.0 which is available at +// https://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// ======================================================================== +// + +package org.eclipse.jetty.client; + +import java.net.URI; +import java.nio.ByteBuffer; + +import org.eclipse.jetty.client.AsyncContentListenerTest.TestSource; +import org.eclipse.jetty.client.transport.HttpConversation; +import org.eclipse.jetty.client.transport.HttpRequest; +import org.eclipse.jetty.client.transport.HttpResponse; +import org.eclipse.jetty.io.Content; +import org.junit.jupiter.api.Test; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.instanceOf; +import static org.hamcrest.Matchers.is; + +public class ContentListenerTest +{ + @Test + public void testOnContentThrowingException() + { + TestSource originalSource = new TestSource( + Content.Chunk.from(ByteBuffer.wrap(new byte[] {1}), false), + Content.Chunk.from(ByteBuffer.wrap(new byte[] {2}), false), + Content.Chunk.from(ByteBuffer.wrap(new byte[] {3}), true) + ); + Response.ContentListener contentListener = (response, content) -> + { + throw new NumberFormatException(); + }; + + HttpResponse response = new HttpResponse(new HttpRequest(new HttpClient(), new HttpConversation(), URI.create("http://localhost"))); + contentListener.onContentSource(response, originalSource); + + // Assert that the source was failed. + Content.Chunk lastChunk = originalSource.read(); + assertThat(Content.Chunk.isFailure(lastChunk, true), is(true)); + assertThat(lastChunk.getFailure(), instanceOf(NumberFormatException.class)); + + // Assert that the response was aborted. + assertThat(response.getRequest().getAbortCause(), instanceOf(NumberFormatException.class)); + } +}