From 630d5c5b5aebce997b7acbf680dd2913b5fc0fcd Mon Sep 17 00:00:00 2001 From: Simone Bordet Date: Fri, 20 Dec 2024 12:16:09 +0100 Subject: [PATCH] Fixed HTTP/2 serialization in HttpReceiverOverHTTP2. Fixed reset race in HTTP2Stream. Signed-off-by: Simone Bordet --- .../client/AuthenticationProtocolHandler.java | 7 --- .../jetty/client/RedirectProtocolHandler.java | 5 +- .../jetty/client/transport/HttpReceiver.java | 60 +++++++++---------- .../eclipse/jetty/client/HttpClientTest.java | 15 +++-- .../jetty/client/HttpRequestAbortTest.java | 20 +++---- .../internal/HttpReceiverOverHTTP2.java | 19 ++++-- .../org/eclipse/jetty/http2/HTTP2Stream.java | 34 ++++++----- .../transport/ThreadStarvationTest.java | 11 +++- .../jetty/util/thread/SerializedInvoker.java | 15 +++-- 9 files changed, 101 insertions(+), 85 deletions(-) diff --git a/jetty-core/jetty-client/src/main/java/org/eclipse/jetty/client/AuthenticationProtocolHandler.java b/jetty-core/jetty-client/src/main/java/org/eclipse/jetty/client/AuthenticationProtocolHandler.java index 79302aa1b59c..aeb72b8bd13e 100644 --- a/jetty-core/jetty-client/src/main/java/org/eclipse/jetty/client/AuthenticationProtocolHandler.java +++ b/jetty-core/jetty-client/src/main/java/org/eclipse/jetty/client/AuthenticationProtocolHandler.java @@ -136,13 +136,6 @@ public void onComplete(Result result) { HttpRequest request = (HttpRequest)result.getRequest(); ContentResponse response = new HttpContentResponse(result.getResponse(), getContent(), getMediaType(), getEncoding()); - if (result.getResponseFailure() != null) - { - if (LOG.isDebugEnabled()) - LOG.debug("Authentication challenge failed", result.getFailure()); - forwardFailureComplete(request, result.getRequestFailure(), response, result.getResponseFailure()); - return; - } String authenticationAttribute = getAuthenticationAttribute(); HttpConversation conversation = request.getConversation(); diff --git a/jetty-core/jetty-client/src/main/java/org/eclipse/jetty/client/RedirectProtocolHandler.java b/jetty-core/jetty-client/src/main/java/org/eclipse/jetty/client/RedirectProtocolHandler.java index 56692b7a2003..7479c1e76fa2 100644 --- a/jetty-core/jetty-client/src/main/java/org/eclipse/jetty/client/RedirectProtocolHandler.java +++ b/jetty-core/jetty-client/src/main/java/org/eclipse/jetty/client/RedirectProtocolHandler.java @@ -70,9 +70,6 @@ public void onComplete(Result result) { Request request = result.getRequest(); Response response = result.getResponse(); - if (result.getResponseFailure() == null) - redirector.redirect(request, response, null); - else - redirector.fail(request, response, result.getFailure()); + redirector.redirect(request, response, null); } } diff --git a/jetty-core/jetty-client/src/main/java/org/eclipse/jetty/client/transport/HttpReceiver.java b/jetty-core/jetty-client/src/main/java/org/eclipse/jetty/client/transport/HttpReceiver.java index 79353893d812..5641b1aaa2c4 100644 --- a/jetty-core/jetty-client/src/main/java/org/eclipse/jetty/client/transport/HttpReceiver.java +++ b/jetty-core/jetty-client/src/main/java/org/eclipse/jetty/client/transport/HttpReceiver.java @@ -157,7 +157,7 @@ protected void responseBegin(HttpExchange exchange) responseState = ResponseState.BEGIN; HttpResponse response = exchange.getResponse(); HttpConversation conversation = exchange.getConversation(); - // Probe the protocol handlers + // Probe the protocol handlers. HttpClient client = getHttpDestination().getHttpClient(); ProtocolHandler protocolHandler = client.findProtocolHandler(exchange.getRequest(), response); Response.Listener handlerListener = null; @@ -170,7 +170,7 @@ protected void responseBegin(HttpExchange exchange) conversation.updateResponseListeners(handlerListener); if (LOG.isDebugEnabled()) - LOG.debug("Response begin {}", response); + LOG.debug("Notifying response begin for {} on {}", exchange, this); conversation.getResponseListeners().notifyBegin(response); }); } @@ -189,12 +189,12 @@ protected void responseBegin(HttpExchange exchange) protected void responseHeader(HttpExchange exchange, HttpField field) { if (LOG.isDebugEnabled()) - LOG.debug("Invoking responseHeader for {} on {}", field, this); + LOG.debug("Invoking responseHeader {} for {} on {}", field, exchange, this); invoker.run(() -> { if (LOG.isDebugEnabled()) - LOG.debug("Executing responseHeader on {}", this); + LOG.debug("Executing responseHeader for {} on {}", exchange, this); if (exchange.isResponseCompleteOrTerminated()) return; @@ -202,10 +202,10 @@ protected void responseHeader(HttpExchange exchange, HttpField field) responseState = ResponseState.HEADER; HttpResponse response = exchange.getResponse(); if (LOG.isDebugEnabled()) - LOG.debug("Notifying header {}", field); + LOG.debug("Notifying response header {} for {} on {}", field, exchange, this); boolean process = exchange.getConversation().getResponseListeners().notifyHeader(response, field); if (LOG.isDebugEnabled()) - LOG.debug("Header {} notified, {}processing needed", field, (process ? "" : "no ")); + LOG.debug("Notified response header {}, processing {}", field, (process ? "needed" : "skipped")); if (process) { response.addHeader(field); @@ -241,12 +241,12 @@ protected void storeCookie(URI uri, HttpField field) protected void responseHeaders(HttpExchange exchange) { if (LOG.isDebugEnabled()) - LOG.debug("Invoking responseHeaders on {}", this); + LOG.debug("Invoking responseHeaders for {} on {}", exchange, this); invoker.run(() -> { if (LOG.isDebugEnabled()) - LOG.debug("Executing responseHeaders on {}", this); + LOG.debug("Executing responseHeaders for {} on {}", exchange, this); if (exchange.isResponseCompleteOrTerminated()) return; @@ -288,6 +288,8 @@ protected void responseHeaders(HttpExchange exchange) } } + if (LOG.isDebugEnabled()) + LOG.debug("Notifying response headers for {} on {}", exchange, this); ResponseListeners responseListeners = exchange.getConversation().getResponseListeners(); responseListeners.notifyHeaders(response); @@ -298,6 +300,7 @@ protected void responseHeaders(HttpExchange exchange) { if (LOG.isDebugEnabled()) LOG.debug("Interim response status {}, succeeding", response.getStatus()); + // TODO: explain it's queued. responseSuccess(exchange, this::onInterim); return; } @@ -311,12 +314,12 @@ protected void responseHeaders(HttpExchange exchange) if (decoderFactory != null) { if (LOG.isDebugEnabled()) - LOG.debug("Decoding {} response content", decoderFactory.getEncoding()); + LOG.debug("Decoding {} response content for {} on {}", decoderFactory.getEncoding(), exchange, this); contentSource = new DecodedContentSource(decoderFactory.newDecoderContentSource(rawContentSource), response); } if (LOG.isDebugEnabled()) - LOG.debug("Response content {} {}", response, contentSource); + LOG.debug("Notifying response content {} for {} on {}", contentSource, exchange, this); responseListeners.notifyContentSource(response, contentSource); }); } @@ -327,21 +330,22 @@ protected void responseHeaders(HttpExchange exchange) * This method takes care of ensuring the {@link Content.Source} passed to * {@link Response.ContentSourceListener#onContentSource(Response, Content.Source)} * calls the demand callback. - * The call to the demand callback is serialized with other events. */ protected void responseContentAvailable(HttpExchange exchange) { if (LOG.isDebugEnabled()) - LOG.debug("Invoking responseContentAvailable on {}", this); + LOG.debug("Invoking responseContentAvailable for {} on {}", exchange, this); invoker.run(() -> { if (LOG.isDebugEnabled()) - LOG.debug("Executing responseContentAvailable on {}", this); + LOG.debug("Executing responseContentAvailable for {} on {}", exchange, this); if (exchange.isResponseCompleteOrTerminated()) return; + if (LOG.isDebugEnabled()) + LOG.debug("Notifying data available for {} on {}", exchange, this); rawContentSource.onDataAvailable(); }); } @@ -358,7 +362,7 @@ protected void responseContentAvailable(HttpExchange exchange) protected void responseSuccess(HttpExchange exchange, Runnable afterSuccessTask) { if (LOG.isDebugEnabled()) - LOG.debug("Invoking responseSuccess on {}", this); + LOG.debug("Invoking responseSuccess for {} on {}", exchange, this); // Mark atomically the response as completed, with respect // to concurrency between response success and response failure. @@ -368,7 +372,7 @@ protected void responseSuccess(HttpExchange exchange, Runnable afterSuccessTask) Runnable successTask = () -> { if (LOG.isDebugEnabled()) - LOG.debug("Executing responseSuccess on {}", this); + LOG.debug("Executing responseSuccess for {} on {}", exchange, this); responseState = ResponseState.IDLE; @@ -376,7 +380,7 @@ protected void responseSuccess(HttpExchange exchange, Runnable afterSuccessTask) HttpResponse response = exchange.getResponse(); if (LOG.isDebugEnabled()) - LOG.debug("Response success {}", response); + LOG.debug("Notifying response success for {} on {}", exchange, this); exchange.getConversation().getResponseListeners().notifySuccess(response); // Interim responses do not terminate the exchange. @@ -403,11 +407,11 @@ protected void responseSuccess(HttpExchange exchange, Runnable afterSuccessTask) */ protected void responseFailure(Throwable failure, Promise promise) { - if (LOG.isDebugEnabled()) - LOG.debug("Failing with {} on {}", failure, this); - HttpExchange exchange = getHttpExchange(); + if (LOG.isDebugEnabled()) + LOG.debug("Response failure {} for {} on {}", failure, exchange, this); + // Mark atomically the response as completed, with respect // to concurrency between response success and response failure. if (exchange != null && exchange.responseComplete(failure)) @@ -492,7 +496,7 @@ private void cleanup() public void abort(HttpExchange exchange, Throwable failure, Promise promise) { if (LOG.isDebugEnabled()) - LOG.debug("Invoking abort with {} on {}", failure, this); + LOG.debug("Invoking abort for {} on {}", exchange, this, failure); if (!exchange.isResponseCompleteOrTerminated()) throw new IllegalStateException(); @@ -510,13 +514,15 @@ public void abort(HttpExchange exchange, Throwable failure, Promise pro responseState = ResponseState.FAILURE; this.failure = failure; + if (contentSource != null) contentSource.fail(failure); + dispose(); HttpResponse response = exchange.getResponse(); if (LOG.isDebugEnabled()) - LOG.debug("Response abort {} {} on {}", response, exchange, getHttpChannel(), failure); + LOG.debug("Notifying response failure {} for {} on {}", failure, exchange, this); exchange.getConversation().getResponseListeners().notifyFailure(response, failure); // Mark atomically the response as terminated, with @@ -700,10 +706,6 @@ public boolean rewind() } } - /** - * This Content.Source implementation guarantees that all {@link #read(boolean)} calls - * happening from a {@link #demand(Runnable)} callback must be serialized. - */ private class ContentSource implements Content.Source, Invocable { private static final Logger LOG = LoggerFactory.getLogger(ContentSource.class); @@ -761,9 +763,9 @@ private void onDataAvailable() public InvocationType getInvocationType() { Runnable demandCallback = demandCallbackRef.get(); - if (demandCallback == null) - return Invocable.getInvocationType(getHttpChannel().getConnection()); - return Invocable.getInvocationType(demandCallback); + if (demandCallback != null) + return Invocable.getInvocationType(demandCallback); + return Invocable.getInvocationType(getHttpChannel().getConnection()); } @Override @@ -775,8 +777,6 @@ public void demand(Runnable demandCallback) throw new IllegalArgumentException(); if (!demandCallbackRef.compareAndSet(null, demandCallback)) throw new IllegalStateException(); - // The processDemand method may call HttpReceiver.read(boolean) - // so it must be called by the invoker. invoker.run(processDemand); } diff --git a/jetty-core/jetty-client/src/test/java/org/eclipse/jetty/client/HttpClientTest.java b/jetty-core/jetty-client/src/test/java/org/eclipse/jetty/client/HttpClientTest.java index 785115587631..2756be646191 100644 --- a/jetty-core/jetty-client/src/test/java/org/eclipse/jetty/client/HttpClientTest.java +++ b/jetty-core/jetty-client/src/test/java/org/eclipse/jetty/client/HttpClientTest.java @@ -180,14 +180,17 @@ public boolean handle(org.eclipse.jetty.server.Request request, org.eclipse.jett return true; } }); - client.setConnectBlocking(true); - ContentResponse response = client.GET(scenario.getScheme() + "://localhost:" + connector.getLocalPort()); - assertNotNull(response); - assertEquals(200, response.getStatus()); - byte[] content = response.getContent(); - assertArrayEquals(data, content); + for (int i = 0; i < 2; ++i) + { + ContentResponse response = client.GET(scenario.getScheme() + "://localhost:" + connector.getLocalPort()); + + assertNotNull(response); + assertEquals(200, response.getStatus()); + byte[] content = response.getContent(); + assertArrayEquals(data, content); + } } @ParameterizedTest diff --git a/jetty-core/jetty-client/src/test/java/org/eclipse/jetty/client/HttpRequestAbortTest.java b/jetty-core/jetty-client/src/test/java/org/eclipse/jetty/client/HttpRequestAbortTest.java index 209ed24563a4..bdb023225718 100644 --- a/jetty-core/jetty-client/src/test/java/org/eclipse/jetty/client/HttpRequestAbortTest.java +++ b/jetty-core/jetty-client/src/test/java/org/eclipse/jetty/client/HttpRequestAbortTest.java @@ -23,6 +23,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import org.eclipse.jetty.client.transport.HttpDestination; +import org.eclipse.jetty.http.HttpStatus; import org.eclipse.jetty.io.Content; import org.eclipse.jetty.logging.StacklessLogging; import org.eclipse.jetty.server.Handler; @@ -498,20 +499,19 @@ public boolean handle(org.eclipse.jetty.server.Request request, Response respons @Override public void onComplete(Result result) { - // Fake the fact that the redirect failed. + // Fake the fact that the redirect failed, + // but the redirect should still be followed. Result newResult = new Result(result, cause); super.onComplete(newResult); } }); - ExecutionException e = assertThrows(ExecutionException.class, () -> - { - client.newRequest("localhost", connector.getLocalPort()) - .scheme(scenario.getScheme()) - .path("/redirect") - .timeout(5, TimeUnit.SECONDS) - .send(); - }); - assertSame(cause, e.getCause()); + ContentResponse response = client.newRequest("localhost", connector.getLocalPort()) + .scheme(scenario.getScheme()) + .path("/redirect") + .timeout(5, TimeUnit.SECONDS) + .send(); + + assertEquals(HttpStatus.OK_200, response.getStatus()); } } diff --git a/jetty-core/jetty-http2/jetty-http2-client-transport/src/main/java/org/eclipse/jetty/http2/client/transport/internal/HttpReceiverOverHTTP2.java b/jetty-core/jetty-http2/jetty-http2-client-transport/src/main/java/org/eclipse/jetty/http2/client/transport/internal/HttpReceiverOverHTTP2.java index 942f51e5b7f1..26a8281e3c5e 100644 --- a/jetty-core/jetty-http2/jetty-http2-client-transport/src/main/java/org/eclipse/jetty/http2/client/transport/internal/HttpReceiverOverHTTP2.java +++ b/jetty-core/jetty-http2/jetty-http2-client-transport/src/main/java/org/eclipse/jetty/http2/client/transport/internal/HttpReceiverOverHTTP2.java @@ -15,6 +15,7 @@ import java.io.EOFException; import java.io.IOException; +import java.util.concurrent.Executor; import java.util.concurrent.TimeoutException; import java.util.function.BiFunction; @@ -43,6 +44,7 @@ import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.util.Promise; import org.eclipse.jetty.util.thread.Invocable; +import org.eclipse.jetty.util.thread.SerializedInvoker; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -50,9 +52,13 @@ public class HttpReceiverOverHTTP2 extends HttpReceiver implements HTTP2Channel. { private static final Logger LOG = LoggerFactory.getLogger(HttpReceiverOverHTTP2.class); + private final SerializedInvoker invoker; + public HttpReceiverOverHTTP2(HttpChannel channel) { super(channel); + Executor executor = channel.getHttpDestination().getHttpClient().getExecutor(); + invoker = new SerializedInvoker(HttpReceiverOverHTTP2.class.getName(), executor); } @Override @@ -140,7 +146,7 @@ private Runnable onResponse(Stream stream, HeadersFrame frame, Callback callback return null; } - return new Invocable.ReadyTask(getHttpConnection().getInvocationType(), () -> + return invoker.offer(new Invocable.ReadyTask(getHttpConnection().getInvocationType(), () -> { MetaData.Response response = (MetaData.Response)frame.getMetaData(); HttpResponse httpResponse = exchange.getResponse(); @@ -174,7 +180,7 @@ private Runnable onResponse(Stream stream, HeadersFrame frame, Callback callback responseHeaders(exchange); callback.succeeded(); - }); + })); } private Runnable onTrailer(HeadersFrame frame, Callback callback) @@ -240,7 +246,7 @@ public Runnable onDataAvailable() HttpExchange exchange = getHttpExchange(); if (exchange == null) return null; - return new Invocable.ReadyTask(getInvocationType(), () -> responseContentAvailable(exchange)); + return invoker.offer(new Invocable.ReadyTask(getInvocationType(), () -> responseContentAvailable(exchange))); } @Override @@ -252,7 +258,8 @@ public Runnable onReset(ResetFrame frame, Callback callback) callback.succeeded(); return null; } - return new Invocable.ReadyTask(Invocable.InvocationType.NON_BLOCKING, () -> + + return invoker.offer(() -> { int error = frame.getError(); IOException failure = new IOException(ErrorCode.toString(error, "reset_code_" + error)); @@ -269,12 +276,12 @@ public Runnable onTimeout(TimeoutException failure, Promise promise) promise.succeeded(false); return null; } - return () -> promise.completeWith(exchange.getRequest().abort(failure)); + return invoker.offer(() -> promise.completeWith(exchange.getRequest().abort(failure))); } @Override public Runnable onFailure(Throwable failure, Callback callback) { - return () -> responseFailure(failure, Promise.from(failed -> callback.succeeded(), callback::failed)); + return invoker.offer(() -> responseFailure(failure, Promise.from(failed -> callback.succeeded(), callback::failed))); } } diff --git a/jetty-core/jetty-http2/jetty-http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Stream.java b/jetty-core/jetty-http2/jetty-http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Stream.java index aed5f11ae647..0e057809c4f8 100644 --- a/jetty-core/jetty-http2/jetty-http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Stream.java +++ b/jetty-core/jetty-http2/jetty-http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Stream.java @@ -443,15 +443,6 @@ private void onData(Data data) return; } - if (isReset()) - { - // Just drop the frame. - if (LOG.isDebugEnabled()) - LOG.debug("Data {} for already reset {}", data, this); - session.dataConsumed(this, data.frame().flowControlLength()); - return; - } - if (dataLength >= 0) { dataLength -= frame.remaining(); @@ -471,14 +462,29 @@ private void onData(Data data) private boolean offer(Data data) { - // Retain the data because it is stored for later use. - data.retain(); - boolean process; + boolean reset; + boolean process = false; try (AutoLock ignored = lock.lock()) { - process = dataQueue.isEmpty() && dataDemand; - dataQueue.offer(data); + reset = isReset(); + if (!reset) + { + process = dataQueue.isEmpty() && dataDemand; + // Retain the data because it is stored for later use. + data.retain(); + dataQueue.offer(data); + } + } + + if (reset) + { + // Drop the frame. + if (LOG.isDebugEnabled()) + LOG.debug("Data {} for already reset {}", data, this); + session.dataConsumed(this, data.frame().flowControlLength()); + return false; } + if (LOG.isDebugEnabled()) LOG.debug("Data {} notifying onDataAvailable() {} for {}", data, process, this); return process; diff --git a/jetty-core/jetty-tests/jetty-test-client-transports/src/test/java/org/eclipse/jetty/test/client/transport/ThreadStarvationTest.java b/jetty-core/jetty-tests/jetty-test-client-transports/src/test/java/org/eclipse/jetty/test/client/transport/ThreadStarvationTest.java index 479d22f28d08..4a9ab3f8a248 100644 --- a/jetty-core/jetty-tests/jetty-test-client-transports/src/test/java/org/eclipse/jetty/test/client/transport/ThreadStarvationTest.java +++ b/jetty-core/jetty-tests/jetty-test-client-transports/src/test/java/org/eclipse/jetty/test/client/transport/ThreadStarvationTest.java @@ -166,11 +166,16 @@ public boolean handle(Request request, Response response, Callback callback) thr .timeout(2 * idleTimeout, TimeUnit.MILLISECONDS) .send(result -> { - // The response should arrive correctly, - // it is the request that failed. + // The response frames arrive for all protocols (on the network). + // It is the request that is failed because the request content was not sent. + // For HTTP/2 a RST_STREAM arrives to fail the request, but it likely fails + // the response too, by draining the queued DATA frames before they are read + // by the content listener. + // For the other protocols the request is failed by the total timeout. assertTrue(result.isFailed()); - assertNull(result.getResponseFailure()); assertEquals(HttpStatus.INTERNAL_SERVER_ERROR_500, result.getResponse().getStatus()); + if (transportType != TransportType.H2C && transportType != TransportType.H2) + assertNull(result.getResponseFailure()); responseLatch.countDown(); }); diff --git a/jetty-core/jetty-util/src/main/java/org/eclipse/jetty/util/thread/SerializedInvoker.java b/jetty-core/jetty-util/src/main/java/org/eclipse/jetty/util/thread/SerializedInvoker.java index 7ea7048c7dc4..20240cd3d3e3 100644 --- a/jetty-core/jetty-util/src/main/java/org/eclipse/jetty/util/thread/SerializedInvoker.java +++ b/jetty-core/jetty-util/src/main/java/org/eclipse/jetty/util/thread/SerializedInvoker.java @@ -110,7 +110,7 @@ public Runnable offer(Runnable task) if (task == null) { if (LOG.isDebugEnabled()) - LOG.debug("Offering task null, skipping it in {}", this); + LOG.debug("Offering task null, skipping it on {}", this); return null; } // The NamedRunnable logger is checked to make it possible to enable the nice task names in a debugger @@ -125,12 +125,17 @@ public Runnable offer(Runnable task) task = new NamedRunnable(task); } } + Link link = new Link(task); - if (LOG.isDebugEnabled()) - LOG.debug("Offering link {} of {}", link, this); Link penultimate = _tail.getAndSet(link); + boolean queued = penultimate != null; + + if (LOG.isDebugEnabled()) + LOG.debug("{} {} on {}", queued ? "Queued" : "Offered", link, this); + if (penultimate == null) return link; + penultimate._next.lazySet(link); return null; } @@ -247,7 +252,7 @@ public void run() while (link != null) { if (LOG.isDebugEnabled()) - LOG.debug("Running link {} of {}", link, SerializedInvoker.this); + LOG.debug("Running {} of {}", link, SerializedInvoker.this); Runnable task = link.getTask(); InvocationType currentInvocationType = link.getInvocationType(); @@ -270,7 +275,7 @@ public void run() catch (Throwable t) { if (LOG.isDebugEnabled()) - LOG.debug("Failed while running link {} of {}", link, SerializedInvoker.this, t); + LOG.debug("Failed while running {} of {}", link, SerializedInvoker.this, t); onError(task, t); } finally