Skip to content

Commit

Permalink
Fixes #8887 - Jetty-12 client calls onDataAvailable with producing th…
Browse files Browse the repository at this point in the history
…read.

Now the calls to the upper layer produce tasks that are fed to the ExecutionFactory in HTTP2Connection.

Signed-off-by: Simone Bordet <simone.bordet@gmail.com>
  • Loading branch information
sbordet committed Feb 21, 2024
1 parent d02406c commit 74403ca
Show file tree
Hide file tree
Showing 14 changed files with 114 additions and 54 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ public void abort(HttpExchange exchange, Throwable requestFailure, Throwable res
else
responsePromise.succeeded(false);

requestPromise.thenAcceptBoth(responsePromise, (requestAborted, responseAborted) -> promise.succeeded(requestAborted || responseAborted));
promise.completeWith(requestPromise.thenCombine(responsePromise, (requestAborted, responseAborted) -> requestAborted || responseAborted));
}

public void abortResponse(HttpExchange exchange, Throwable failure, Promise<Boolean> promise)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.eclipse.jetty.client.Request;
import org.eclipse.jetty.client.transport.HttpDestination;
import org.eclipse.jetty.http.HttpScheme;
import org.eclipse.jetty.http2.HTTP2Connection;
import org.eclipse.jetty.http2.api.Session;
import org.eclipse.jetty.http2.client.HTTP2Client;
import org.eclipse.jetty.http2.client.HTTP2ClientConnectionFactory;
Expand Down Expand Up @@ -168,9 +169,9 @@ public org.eclipse.jetty.io.Connection newConnection(EndPoint endPoint, Map<Stri
return factory.newConnection(endPoint, context);
}

protected Connection newConnection(Destination destination, Session session)
protected Connection newConnection(Destination destination, Session session, HTTP2Connection connection)
{
return new HttpConnectionOverHTTP2(destination, session);
return new HttpConnectionOverHTTP2(destination, session, connection);
}

protected void onClose(Connection connection, GoAwayFrame frame)
Expand All @@ -186,9 +187,9 @@ private SessionListenerPromise(Map<String, Object> context)
}

@Override
protected Connection newConnection(Destination destination, Session session)
protected Connection newConnection(Destination destination, Session session, HTTP2Connection connection)
{
return HttpClientTransportOverHTTP2.this.newConnection(destination, session);
return HttpClientTransportOverHTTP2.this.newConnection(destination, session, connection);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,16 @@

import java.util.concurrent.TimeoutException;

import org.eclipse.jetty.http2.ErrorCode;
import org.eclipse.jetty.http2.HTTP2Channel;
import org.eclipse.jetty.http2.HTTP2Stream;
import org.eclipse.jetty.http2.HTTP2StreamEndPoint;
import org.eclipse.jetty.http2.frames.ResetFrame;
import org.eclipse.jetty.io.Connection;
import org.eclipse.jetty.io.EofException;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.Promise;
import org.eclipse.jetty.util.thread.Invocable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -34,26 +38,51 @@ public ClientHTTP2StreamEndPoint(HTTP2Stream stream)
}

@Override
public void onDataAvailable()
public Runnable onDataAvailable()
{
processDataAvailable();
// The InvocationType may change depending on the read callback.
return new Invocable.ReadyTask(getInvocationType(), this::processDataAvailable);
}

@Override
public void onTimeout(TimeoutException timeout, Promise<Boolean> promise)
public Runnable onReset(ResetFrame frame, Callback callback)
{
int error = frame.getError();
EofException failure = new EofException(ErrorCode.toString(error, "error_code_" + error));
return onFailure(failure, callback);
}

@Override
public Runnable onTimeout(TimeoutException timeout, Promise<Boolean> promise)
{
if (LOG.isDebugEnabled())
LOG.debug("idle timeout on {}", this, timeout);
Connection connection = getConnection();
if (connection != null)
promise.succeeded(connection.onIdleExpired(timeout));
else
if (connection == null)
{
promise.succeeded(true);
return null;
}
return new Invocable.ReadyTask(Invocable.InvocationType.NON_BLOCKING, () ->
{
boolean expire = connection.onIdleExpired(timeout);
if (expire)
{
processFailure(timeout);
close(timeout);
}
promise.succeeded(expire);
});
}

@Override
public void onFailure(Throwable failure, Callback callback)
public Runnable onFailure(Throwable failure, Callback callback)
{
callback.failed(failure);
return new Invocable.ReadyTask(Invocable.InvocationType.NON_BLOCKING, () ->
{
processFailure(failure);
close(failure);
callback.failed(failure);
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.eclipse.jetty.client.Connection;
import org.eclipse.jetty.client.Destination;
import org.eclipse.jetty.client.HttpClientTransport;
import org.eclipse.jetty.http2.HTTP2Connection;
import org.eclipse.jetty.http2.HTTP2Session;
import org.eclipse.jetty.http2.api.Session;
import org.eclipse.jetty.http2.frames.GoAwayFrame;
Expand Down Expand Up @@ -72,7 +73,8 @@ public void onSettings(Session session, SettingsFrame frame)

private void onServerPreface(Session session)
{
HttpConnectionOverHTTP2 connection = (HttpConnectionOverHTTP2)newConnection(destination(), session);
HTTP2Connection http2Connection = (HTTP2Connection)context.get(HTTP2Connection.class.getName());
HttpConnectionOverHTTP2 connection = (HttpConnectionOverHTTP2)newConnection(destination(), session, http2Connection);
if (this.connection.compareAndSet(null, connection, false, true))
{
// The connection promise must be called synchronously
Expand All @@ -82,9 +84,9 @@ private void onServerPreface(Session session)
}
}

protected Connection newConnection(Destination destination, Session session)
protected Connection newConnection(Destination destination, Session session, HTTP2Connection connection)
{
return new HttpConnectionOverHTTP2(destination, session);
return new HttpConnectionOverHTTP2(destination, session, connection);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -197,29 +197,28 @@ public Stream.Listener onPush(Stream stream, PushPromiseFrame frame)
public void onDataAvailable(Stream stream)
{
HTTP2Channel.Client channel = (HTTP2Channel.Client)((HTTP2Stream)stream).getAttachment();
channel.onDataAvailable();
connection.offerTask(channel.onDataAvailable(), false);
}

@Override
public void onReset(Stream stream, ResetFrame frame, Callback callback)
{
// TODO: needs to call HTTP2Channel?
receiver.onReset(frame);
callback.succeeded();
HTTP2Channel.Client channel = (HTTP2Channel.Client)((HTTP2Stream)stream).getAttachment();
connection.offerTask(channel.onReset(frame, callback), false);
}

@Override
public void onIdleTimeout(Stream stream, TimeoutException x, Promise<Boolean> promise)
{
HTTP2Channel.Client channel = (HTTP2Channel.Client)((HTTP2Stream)stream).getAttachment();
channel.onTimeout(x, promise);
connection.offerTask(channel.onTimeout(x, promise), false);
}

@Override
public void onFailure(Stream stream, int error, String reason, Throwable failure, Callback callback)
{
HTTP2Channel.Client channel = (HTTP2Channel.Client)((HTTP2Stream)stream).getAttachment();
channel.onFailure(failure, callback);
connection.offerTask(channel.onFailure(failure, callback), false);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.eclipse.jetty.http.HttpVersion;
import org.eclipse.jetty.http.MetaData;
import org.eclipse.jetty.http2.ErrorCode;
import org.eclipse.jetty.http2.HTTP2Connection;
import org.eclipse.jetty.http2.HTTP2Session;
import org.eclipse.jetty.http2.api.Session;
import org.eclipse.jetty.http2.api.Stream;
Expand All @@ -57,12 +58,14 @@ public class HttpConnectionOverHTTP2 extends HttpConnection implements Sweeper.S
private final AtomicBoolean closed = new AtomicBoolean();
private final AtomicInteger sweeps = new AtomicInteger();
private final Session session;
private final HTTP2Connection connection;
private boolean recycleHttpChannels = true;

public HttpConnectionOverHTTP2(Destination destination, Session session)
public HttpConnectionOverHTTP2(Destination destination, Session session, HTTP2Connection connection)
{
super((HttpDestination)destination);
this.session = session;
this.connection = connection;
}

public Session getSession()
Expand Down Expand Up @@ -277,6 +280,12 @@ public boolean sweep()
return sweeps.incrementAndGet() >= 4;
}

void offerTask(Runnable task, boolean dispatch)
{
if (task != null)
connection.offerTask(task, dispatch);
}

@Override
public String toString()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,16 @@
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.Promise;
import org.eclipse.jetty.util.thread.Invocable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class HttpReceiverOverHTTP2 extends HttpReceiver implements HTTP2Channel.Client
{
private static final Logger LOG = LoggerFactory.getLogger(HttpReceiverOverHTTP2.class);

private final Runnable onDataAvailableTask = new Invocable.ReadyTask(Invocable.InvocationType.NON_BLOCKING, this::responseContentAvailable);

public HttpReceiverOverHTTP2(HttpChannel channel)
{
super(channel);
Expand Down Expand Up @@ -202,37 +205,49 @@ Stream.Listener onPush(Stream stream, PushPromiseFrame frame)
}

@Override
public void onDataAvailable()
public Runnable onDataAvailable()
{
HttpExchange exchange = getHttpExchange();
if (exchange == null)
return;

responseContentAvailable();
return null;
return onDataAvailableTask;
}

void onReset(ResetFrame frame)
@Override
public Runnable onReset(ResetFrame frame, Callback callback)
{
HttpExchange exchange = getHttpExchange();
if (exchange == null)
return;
int error = frame.getError();
exchange.getRequest().abort(new IOException(ErrorCode.toString(error, "reset_code_" + error)));
{
callback.succeeded();
return null;
}
return new Invocable.ReadyTask(Invocable.InvocationType.NON_BLOCKING, () ->
{
int error = frame.getError();
IOException failure = new IOException(ErrorCode.toString(error, "reset_code_" + error));
callback.completeWith(exchange.getRequest().abort(failure));
});
}

@Override
public void onTimeout(TimeoutException failure, Promise<Boolean> promise)
public Runnable onTimeout(TimeoutException failure, Promise<Boolean> promise)
{
HttpExchange exchange = getHttpExchange();
if (exchange != null)
exchange.abort(failure, Promise.from(aborted -> promise.succeeded(!aborted), promise::failed));
else
if (exchange == null)
{
promise.succeeded(false);
return null;
}
return new Invocable.ReadyTask(Invocable.InvocationType.NON_BLOCKING, () ->
promise.completeWith(exchange.getRequest().abort(failure))
);
}

@Override
public void onFailure(Throwable failure, Callback callback)
public Runnable onFailure(Throwable failure, Callback callback)
{
responseFailure(failure, Promise.from(failed -> callback.succeeded(), callback::failed));
Promise<Boolean> promise = Promise.from(failed -> callback.succeeded(), callback::failed);
return new Invocable.ReadyTask(Invocable.InvocationType.NON_BLOCKING, () -> responseFailure(failure, promise));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ public Connection newConnection(EndPoint endPoint, Map<String, Object> context)
session.setStreamIdleTimeout(streamIdleTimeout);

HTTP2ClientConnection connection = new HTTP2ClientConnection(client, endPoint, session, sessionPromise, listener);
context.put(HTTP2Connection.class.getName(), connection);
connection.addEventListener(connectionListener);
parser.init(connection);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import java.util.function.BiConsumer;

import org.eclipse.jetty.http2.frames.HeadersFrame;
import org.eclipse.jetty.http2.frames.ResetFrame;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.Promise;

Expand All @@ -33,11 +34,13 @@ public interface HTTP2Channel
*/
public interface Client
{
public void onDataAvailable();
public Runnable onDataAvailable();

public Runnable onReset(ResetFrame frame, Callback callback);

public void onTimeout(TimeoutException failure, Promise<Boolean> promise);
public Runnable onTimeout(TimeoutException failure, Promise<Boolean> promise);

public void onFailure(Throwable failure, Callback callback);
public Runnable onFailure(Throwable failure, Callback callback);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ public boolean onIdleExpired(TimeoutException timeoutException)
return false;
}

protected void offerTask(Runnable task, boolean dispatch)
public void offerTask(Runnable task, boolean dispatch)
{
offerTask(task);
if (dispatch)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -497,6 +497,12 @@ private void process()
callback.succeeded();
}

protected Invocable.InvocationType getInvocationType()
{
Callback callback = readCallback.get();
return callback == null ? Invocable.InvocationType.NON_BLOCKING : callback.getInvocationType();
}

@Override
public String toString()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -309,13 +309,6 @@ public boolean upgrade(Request request, HttpFields.Mutable responseFields)
return true;
}

// Overridden for visibility.
@Override
protected void offerTask(Runnable task, boolean dispatch)
{
super.offerTask(task, dispatch);
}

@Override
public String getId()
{
Expand Down
Loading

0 comments on commit 74403ca

Please sign in to comment.