Skip to content

Commit

Permalink
Recreate or wrap exceptions thrown by async transport implementations…
Browse files Browse the repository at this point in the history
… to preserve caller stack traces (#656)

* wrap exceptions thrown by async transport implementations in IOException

Signed-off-by: Andrew Parmet <andrew@parmet.com>

* license headers

Signed-off-by: Andrew Parmet <andrew@parmet.com>

* changelog

Signed-off-by: Andrew Parmet <andrew@parmet.com>

* tolerate null causes

Signed-off-by: Andrew Parmet <andrew@parmet.com>

* fix tests

Signed-off-by: Andrew Parmet <andrew@parmet.com>

* remove print

Signed-off-by: Andrew Parmet <andrew@parmet.com>

* one more license header

Signed-off-by: Andrew Parmet <andrew@parmet.com>

* fix last non-aws test

Signed-off-by: Andrew Parmet <andrew@parmet.com>

* use multicatch to restrict caught exceptions

Signed-off-by: Andrew Parmet <andrew@parmet.com>

* Replicate the RestClient exception wrapping for async invocation flow

Signed-off-by: Andriy Redko <andriy.redko@aiven.io>

* replicate hc5 exception extraction strategy in aws transport

Signed-off-by: Andrew Parmet <andrew@parmet.com>

* move other tests

Signed-off-by: Andrew Parmet <andrew@parmet.com>

* lint

Signed-off-by: Andrew Parmet <andrew@parmet.com>

* delete aws test for now; add support for OpenSearchClientException

Signed-off-by: Andrew Parmet <andrew@parmet.com>

* reintroduce an aws test, sadly, not extending the abstract case

Signed-off-by: Andrew Parmet <andrew@parmet.com>

* java 8

Signed-off-by: Andrew Parmet <andrew@parmet.com>

* replicate ISE

Signed-off-by: Andrew Parmet <andrew@parmet.com>

* poke

Signed-off-by: Andrew Parmet <andrew@parmet.com>

* handle some netty-specific channel exceptions

Signed-off-by: Andrew Parmet <andrew@parmet.com>

* poke

Signed-off-by: Andrew Parmet <andrew@parmet.com>

* nevermind netty

Signed-off-by: Andrew Parmet <andrew@parmet.com>

* io before rt

Signed-off-by: Andrew Parmet <andrew@parmet.com>

* no hc5

Signed-off-by: Andrew Parmet <andrew@parmet.com>

---------

Signed-off-by: Andrew Parmet <andrew@parmet.com>
Signed-off-by: Andriy Redko <andriy.redko@aiven.io>
Co-authored-by: Andriy Redko <andriy.redko@aiven.io>
(cherry picked from commit db50b7d)
Signed-off-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
  • Loading branch information
github-actions[bot] and reta committed May 30, 2024
1 parent c5a84c1 commit 8d7bab7
Show file tree
Hide file tree
Showing 6 changed files with 224 additions and 18 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.0.0/)

### Fixed
- ApacheHttpClient5Transport requires Apache Commons Logging dependency ([#1003](https://github.com/opensearch-project/opensearch-java/pull/1003))
- Preserve caller information in stack traces when synchronous callers use asynchronous transports ([#656](https://github.com/opensearch-project/opensearch-java/pull/656))

### Security

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
import java.io.IOException;
import java.io.InputStream;
import java.io.UnsupportedEncodingException;
import java.net.ConnectException;
import java.net.SocketTimeoutException;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.URLEncoder;
Expand All @@ -30,12 +32,14 @@
import javax.annotation.CheckForNull;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import javax.net.ssl.SSLHandshakeException;
import org.opensearch.client.json.JsonpDeserializer;
import org.opensearch.client.json.JsonpMapper;
import org.opensearch.client.json.jackson.JacksonJsonpMapper;
import org.opensearch.client.opensearch._types.ErrorCause;
import org.opensearch.client.opensearch._types.ErrorResponse;
import org.opensearch.client.opensearch._types.OpenSearchException;
import org.opensearch.client.opensearch.generic.OpenSearchClientException;
import org.opensearch.client.transport.Endpoint;
import org.opensearch.client.transport.GenericEndpoint;
import org.opensearch.client.transport.JsonEndpoint;
Expand Down Expand Up @@ -199,17 +203,14 @@ public <RequestT, ResponseT, ErrorT> ResponseT performRequest(
try {
return executeAsync((SdkAsyncHttpClient) httpClient, clientReq, requestBody, endpoint, options).get();
} catch (ExecutionException e) {
Throwable cause = e.getCause();
if (cause != null) {
if (cause instanceof IOException) {
throw (IOException) cause;
}
if (cause instanceof RuntimeException) {
throw (RuntimeException) cause;
}
throw new RuntimeException(cause);
Exception cause = extractAndWrapCause(e);
if (cause instanceof IOException) {
throw (IOException) cause;
}
if (cause instanceof RuntimeException) {
throw (RuntimeException) cause;
}
throw new RuntimeException(e);
throw new IllegalStateException("unexpected exception type: must be either RuntimeException or IOException", cause);
} catch (InterruptedException e) {
throw new IOException("HttpRequest was interrupted", e);
}
Expand Down Expand Up @@ -615,4 +616,56 @@ private static <T> Optional<T> or(Optional<T> opt, Supplier<? extends Optional<?
return Objects.requireNonNull(r);
}
}

/**
* Wrap the exception so the caller's signature shows up in the stack trace, taking care to copy the original type and message
* where possible so async and sync code don't have to check different exceptions.
*/
private static Exception extractAndWrapCause(Exception exception) {
if (exception instanceof InterruptedException) {
throw new RuntimeException("thread waiting for the response was interrupted", exception);
}
if (exception instanceof ExecutionException) {
ExecutionException executionException = (ExecutionException) exception;
Throwable t = executionException.getCause() == null ? executionException : executionException.getCause();
if (t instanceof Error) {
throw (Error) t;
}
exception = (Exception) t;
}
if (exception instanceof SocketTimeoutException) {
SocketTimeoutException e = new SocketTimeoutException(exception.getMessage());
e.initCause(exception);
return e;
}
if (exception instanceof SSLHandshakeException) {
SSLHandshakeException e = new SSLHandshakeException(
exception.getMessage() + "\nSee https://opensearch.org/docs/latest/clients/java/ for troubleshooting."
);
e.initCause(exception);
return e;
}
if (exception instanceof ConnectException) {
ConnectException e = new ConnectException(exception.getMessage());
e.initCause(exception);
return e;
}
if (exception instanceof IOException) {
return new IOException(exception.getMessage(), exception);
}
if (exception instanceof OpenSearchException) {
final OpenSearchException e = new OpenSearchException(((OpenSearchException) exception).response());
e.initCause(exception);
return e;
}
if (exception instanceof OpenSearchClientException) {
final OpenSearchClientException e = new OpenSearchClientException(((OpenSearchClientException) exception).response());
e.initCause(exception);
return e;
}
if (exception instanceof RuntimeException) {
return new RuntimeException(exception.getMessage(), exception);
}
return new RuntimeException("error while performing request", exception);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.ConnectException;
import java.net.SocketTimeoutException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.AbstractMap;
Expand All @@ -36,13 +38,16 @@
import java.util.concurrent.CompletionException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.zip.GZIPOutputStream;
import javax.annotation.Nullable;
import javax.net.ssl.SSLHandshakeException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hc.client5.http.ConnectTimeoutException;
import org.apache.hc.client5.http.auth.AuthCache;
import org.apache.hc.client5.http.auth.AuthScheme;
import org.apache.hc.client5.http.auth.AuthScope;
Expand All @@ -57,6 +62,7 @@
import org.apache.hc.client5.http.protocol.HttpClientContext;
import org.apache.hc.core5.concurrent.FutureCallback;
import org.apache.hc.core5.http.ClassicHttpResponse;
import org.apache.hc.core5.http.ConnectionClosedException;
import org.apache.hc.core5.http.ContentType;
import org.apache.hc.core5.http.Header;
import org.apache.hc.core5.http.HttpEntity;
Expand All @@ -77,6 +83,8 @@
import org.opensearch.client.json.JsonpDeserializer;
import org.opensearch.client.json.JsonpMapper;
import org.opensearch.client.json.NdJsonpSerializable;
import org.opensearch.client.opensearch._types.OpenSearchException;
import org.opensearch.client.opensearch.generic.OpenSearchClientException;
import org.opensearch.client.transport.Endpoint;
import org.opensearch.client.transport.GenericEndpoint;
import org.opensearch.client.transport.GenericSerializable;
Expand Down Expand Up @@ -145,15 +153,16 @@ public <RequestT, ResponseT, ErrorT> ResponseT performRequest(
TransportOptions options
) throws IOException {
try {
return performRequestAsync(request, endpoint, options).join();
} catch (final CompletionException ex) {
if (ex.getCause() instanceof RuntimeException) {
throw (RuntimeException) ex.getCause();
} else if (ex.getCause() instanceof IOException) {
throw (IOException) ex.getCause();
} else {
throw new IOException(ex.getCause());
return performRequestAsync(request, endpoint, options).get();
} catch (final Exception ex) {
Exception cause = extractAndWrapCause(ex);
if (cause instanceof IOException) {
throw (IOException) cause;
}
if (cause instanceof RuntimeException) {
throw (RuntimeException) cause;
}
throw new IllegalStateException("unexpected exception type: must be either RuntimeException or IOException", cause);
}
}

Expand Down Expand Up @@ -1083,4 +1092,77 @@ public InputStream asInput() {
return new ByteArrayInputStream(this.buf, 0, this.count);
}
}

/**
* Wrap the exception so the caller's signature shows up in the stack trace, taking care to copy the original type and message
* where possible so async and sync code don't have to check different exceptions.
*/
private static Exception extractAndWrapCause(Exception exception) {
if (exception instanceof InterruptedException) {
throw new RuntimeException("thread waiting for the response was interrupted", exception);
}
if (exception instanceof ExecutionException) {
ExecutionException executionException = (ExecutionException) exception;
Throwable t = executionException.getCause() == null ? executionException : executionException.getCause();
if (t instanceof Error) {
throw (Error) t;
}
exception = (Exception) t;
}
if (exception instanceof ConnectTimeoutException) {
ConnectTimeoutException e = new ConnectTimeoutException(exception.getMessage());
e.initCause(exception);
return e;
}
if (exception instanceof SocketTimeoutException) {
SocketTimeoutException e = new SocketTimeoutException(exception.getMessage());
e.initCause(exception);
return e;
}
if (exception instanceof ConnectionClosedException) {
ConnectionClosedException e = new ConnectionClosedException(exception.getMessage());
e.initCause(exception);
return e;
}
if (exception instanceof SSLHandshakeException) {
SSLHandshakeException e = new SSLHandshakeException(
exception.getMessage() + "\nSee https://opensearch.org/docs/latest/clients/java/ for troubleshooting."
);
e.initCause(exception);
return e;
}
if (exception instanceof ConnectException) {
ConnectException e = new ConnectException(exception.getMessage());
e.initCause(exception);
return e;
}
if (exception instanceof ResponseException) {
try {
ResponseException e = new ResponseException(((ResponseException) exception).getResponse());
e.initCause(exception);
return e;
} catch (final IOException ex) {
// We are not able to reconstruct the response, throw IOException instead
return new IOException(exception.getMessage(), exception);
}
}
if (exception instanceof IOException) {
return new IOException(exception.getMessage(), exception);
}
if (exception instanceof OpenSearchException) {
final OpenSearchException e = new OpenSearchException(((OpenSearchException) exception).response());
e.initCause(exception);
return e;
}
if (exception instanceof OpenSearchClientException) {
final OpenSearchClientException e = new OpenSearchClientException(((OpenSearchClientException) exception).response());
e.initCause(exception);
return e;
}
if (exception instanceof RuntimeException) {
return new RuntimeException(exception.getMessage(), exception);
}
return new RuntimeException("error while performing request", exception);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.client.opensearch.integTest.aws;

import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;

import java.util.List;
import org.apache.logging.log4j.core.util.Throwables;
import org.junit.Test;
import org.opensearch.client.opensearch.OpenSearchClient;

// It would be nice to extend AbstractAsyncStracktraceIT.
public class AwsSdk2AsyncStacktraceIT extends AwsSdk2TransportTestCase {
@Test
public void testFailureFromClientPreservesStacktraceOfCaller() throws Exception {
final OpenSearchClient client = getClient(false, null, null);
Exception thrown = assertThrows(Exception.class, () -> client.indices().get(g -> g.index("nonexisting-index")));

List<String> stacktraceElements = Throwables.toStringList(thrown);
boolean someElementContainsCallerMethodName = stacktraceElements.stream()
.anyMatch(it -> it.contains("testFailureFromClientPreservesStacktraceOfCaller"));

assertTrue(someElementContainsCallerMethodName);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.client.opensearch.integTest;

import org.apache.logging.log4j.core.util.Throwables;
import org.junit.Test;

public abstract class AbstractAsyncStracktraceIT extends OpenSearchJavaClientTestCase {
@Test
public void testFailureFromClientPreservesStacktraceOfCaller() throws Exception {
var thrown = assertThrows(Exception.class, () -> javaClient().indices().get(g -> g.index("nonexisting-index")));

var stacktraceElements = Throwables.toStringList(thrown);
var someElementContainsCallerMethodName = stacktraceElements.stream()
.anyMatch(it -> it.contains("testFailureFromClientPreservesStacktraceOfCaller"));

assertTrue(someElementContainsCallerMethodName);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.client.opensearch.integTest.httpclient5;

import org.opensearch.client.opensearch.integTest.AbstractAsyncStracktraceIT;

public class AsyncStacktraceIT extends AbstractAsyncStracktraceIT implements HttpClient5TransportSupport {}

0 comments on commit 8d7bab7

Please sign in to comment.