Skip to content

Commit

Permalink
Cleanups extracted from delayed PR #11876
Browse files Browse the repository at this point in the history
  • Loading branch information
gregw committed Jun 25, 2024
1 parent f78f442 commit d055ee2
Show file tree
Hide file tree
Showing 9 changed files with 97 additions and 58 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ protected void doStart() throws Exception
protected void doStop() throws Exception
{
requestTimeouts.destroy();
abort(new AsynchronousCloseException());
abortExchanges(new AsynchronousCloseException());
Sweeper connectionPoolSweeper = client.getBean(Sweeper.class);
if (connectionPoolSweeper != null && connectionPool instanceof Sweeper.Sweepable)
connectionPoolSweeper.remove((Sweeper.Sweepable)connectionPool);
Expand Down Expand Up @@ -294,7 +294,7 @@ public void succeeded()
@Override
public void failed(Throwable x)
{
abort(x);
abortExchanges(x);
}

@Override
Expand Down Expand Up @@ -513,7 +513,7 @@ public boolean remove(Connection connection)
*
* @param cause the abort cause
*/
public void abort(Throwable cause)
private void abortExchanges(Throwable cause)
{
// Copy the queue of exchanges and fail only those that are queued at this moment.
// The application may queue another request from the failure/complete listener
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -545,6 +545,10 @@ protected Action process() throws Throwable
@Override
public void succeeded()
{
// TODO This logic should be moved to process() and/or onCompleteSuccess()
// Anything executed here is not mutually excluded from other threads in process() and/or onCompleteSuccess() and/or onCompleteFailure()
// nor is there a memory barrier.
// So, for example, two threads might try to release and null the chunk field at the same time.
boolean proceed = true;
if (committed)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,8 @@
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;

import static org.awaitility.Awaitility.await;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ public boolean append(HTTP2Session.Entry entry)
{
entries.offer(entry);
if (LOG.isDebugEnabled())
LOG.debug("Appended {}, entries={}", entry, entries.size());
LOG.debug("Appended {}, entries={}, {}", entry, entries.size(), this);
}
}
if (closed == null)
Expand All @@ -129,7 +129,7 @@ public boolean append(List<HTTP2Session.Entry> list)
{
list.forEach(entries::offer);
if (LOG.isDebugEnabled())
LOG.debug("Appended {}, entries={}", list, entries.size());
LOG.debug("Appended {}, entries={} {}", list, entries.size(), this);
}
}
if (closed == null)
Expand Down Expand Up @@ -158,7 +158,7 @@ public int getFrameQueueSize()
protected Action process() throws Throwable
{
if (LOG.isDebugEnabled())
LOG.debug("Flushing {}", session);
LOG.debug("process {} {}", session, this);

try (AutoLock ignored = lock.lock())
{
Expand All @@ -181,7 +181,7 @@ protected Action process() throws Throwable
if (pendingEntries.isEmpty())
{
if (LOG.isDebugEnabled())
LOG.debug("Flushed {}", session);
LOG.debug("Flushed {} {}", session, this);
return Action.IDLE;
}

Expand Down Expand Up @@ -254,7 +254,7 @@ protected Action process() throws Throwable
if (LOG.isDebugEnabled())
LOG.debug("Failure generating {}", entry, failure);
failed(failure);
return Action.SUCCEEDED;
return Action.SCHEDULED;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ public void send(MetaData.Request request, MetaData.Response response, boolean l
POST / HTTP/1.1
Host: localhost
Content-Length: 1
""";
try (LocalConnector.LocalEndPoint endPoint = connector.executeRequest(request))
{
Expand Down Expand Up @@ -234,7 +234,7 @@ public boolean handle(Request request, Response response, Callback callback)
POST / HTTP/1.1
Host: localhost
Content-Length: %d
%s
""".formatted(content.length(), content);
HttpTester.Response response = HttpTester.parseResponse(connector.getResponse(request, 5, TimeUnit.SECONDS));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;

Expand All @@ -39,6 +40,7 @@
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.is;
import static org.junit.jupiter.api.Assertions.assertTrue;

public class EventsHandlerTest
{
Expand Down Expand Up @@ -117,13 +119,15 @@ public void testNanoTimestamps() throws Exception
{
AtomicReference<Long> beginNanoTime = new AtomicReference<>();
AtomicReference<Long> readyNanoTime = new AtomicReference<>();
CountDownLatch latch = new CountDownLatch(1);
EventsHandler eventsHandler = new EventsHandler(new EchoHandler())
{
@Override
protected void onComplete(Request request, int status, HttpFields headers, Throwable failure)
{
beginNanoTime.set(request.getBeginNanoTime());
readyNanoTime.set(request.getHeadersNanoTime());
latch.countDown();
}
};
startServer(eventsHandler);
Expand All @@ -148,6 +152,7 @@ protected void onComplete(Request request, int status, HttpFields headers, Throw
String response = endPoint.getResponse();

assertThat(response, containsString("HTTP/1.1 200 OK"));
assertTrue(latch.await(5, TimeUnit.SECONDS));
assertThat(NanoTime.millisSince(beginNanoTime.get()), greaterThan(900L));
assertThat(NanoTime.millisSince(readyNanoTime.get()), greaterThan(450L));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -303,58 +303,28 @@ public static Throwable combine(Throwable t1, Throwable t2)
return t1;
}

public static void callAndThen(Throwable cause, Consumer<Throwable> first, Consumer<Throwable> second)
{
try
{
first.accept(cause);
}
catch (Throwable t)
{
addSuppressedIfNotAssociated(cause, t);
}
finally
{
second.accept(cause);
}
}

public static void callAndThen(Throwable cause, Consumer<Throwable> first, Runnable second)
{
try
{
first.accept(cause);
}
catch (Throwable t)
{
addSuppressedIfNotAssociated(cause, t);
}
finally
{
second.run();
}
}

public static void callAndThen(Runnable first, Runnable second)
/** Call a method that handles a {@link Throwable}, catching and associating any exception that it throws.
* @param cause The {@link Throwable} to pass to the consumer
* @param consumer The handler of a {@link Throwable}
*/
public static void call(Throwable cause, Consumer<Throwable> consumer)
{
try
{
first.run();
consumer.accept(cause);
}
catch (Throwable t)
{
// ignored
}
finally
{
second.run();
ExceptionUtil.addSuppressedIfNotAssociated(t, cause);
throw t;
}
}

/**
* Call a {@link Invocable.Callable} and handle failures
* @param callable The runnable to call
* @param failure The handling of failures
* Call a {@link Invocable.Callable} and handle any resulting failures
* @param callable The {@link org.eclipse.jetty.util.thread.Invocable.Callable} to call
* @param failure A handler of failures from the call
* @see #run(Runnable, Consumer)
*/
public static void call(Invocable.Callable callable, Consumer<Throwable> failure)
{
Expand All @@ -380,6 +350,7 @@ public static void call(Invocable.Callable callable, Consumer<Throwable> failure
* Call a {@link Runnable} and handle failures
* @param runnable The runnable to call
* @param failure The handling of failures
* @see #call(Throwable, Consumer)
*/
public static void run(Runnable runnable, Consumer<Throwable> failure)
{
Expand All @@ -401,6 +372,71 @@ public static void run(Runnable runnable, Consumer<Throwable> failure)
}
}

/**
* Call a handler of {@link Throwable} and then always call another, suppressing any exceptions thrown.
* @param cause The {@link Throwable} to be passed to both consumers.
* @param call The first {@link Consumer} of {@link Throwable} to call.
* @param then The second {@link Consumer} of {@link Throwable} to call.
*/
public static void callAndThen(Throwable cause, Consumer<Throwable> call, Consumer<Throwable> then)
{
try
{
call.accept(cause);
}
catch (Throwable t)
{
addSuppressedIfNotAssociated(cause, t);
}
finally
{
then.accept(cause);
}
}

/**
* Call a handler of {@link Throwable} and then always call a {@link Runnable}, suppressing any exceptions thrown.
* @param cause The {@link Throwable} to be passed to both consumers.
* @param call The {@link Consumer} of {@link Throwable} to call.
* @param then The {@link Runnable} to call.
*/
public static void callAndThen(Throwable cause, Consumer<Throwable> call, Runnable then)
{
try
{
call.accept(cause);
}
catch (Throwable t)
{
addSuppressedIfNotAssociated(cause, t);
}
finally
{
then.run();
}
}

/**
* Call a {@link Runnable} and then always call another, ignoring any exceptions thrown.
* @param call The first {@link Runnable} to call.
* @param then The second {@link Runnable} to call.
*/
public static void callAndThen(Runnable call, Runnable then)
{
try
{
call.run();
}
catch (Throwable t)
{
// ignored
}
finally
{
then.run();
}
}

/**
* <p>Get from a {@link CompletableFuture} and convert any uncheck exceptions to {@link RuntimeException}.</p>
* @param completableFuture The future to get from.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.Session;
import org.eclipse.jetty.session.AbstractSessionManager;
import org.eclipse.jetty.session.DefaultSessionIdManager;
import org.eclipse.jetty.session.ManagedSession;
import org.eclipse.jetty.session.SessionCache;
import org.eclipse.jetty.session.SessionConfig;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
import java.io.OutputStream;
import java.io.PrintWriter;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
Expand Down Expand Up @@ -70,10 +69,8 @@
import org.junit.jupiter.api.Test;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasItem;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.not;
Expand Down

0 comments on commit d055ee2

Please sign in to comment.