Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CADC-12730 - update RepoClient to catch network errors and throw TransientExceptions #285

Merged
merged 2 commits into from
Aug 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion caom2-repo/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ sourceCompatibility = 1.8

group = 'org.opencadc'

version = '1.4.4'
version = '1.4.5'

description = 'OpenCADC CAOM repository client library'
def git_url = 'https://github.com/opencadc/caom2db'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,8 +93,7 @@ public class RepoClientTest {
private static final Logger log = Logger.getLogger(RepoClientTest.class);

static {
Log4jInit.setLevel("ca.nrc.cadc.caom2.repo.client", Level.DEBUG);
// Log4jInit.setLevel("ca.nrc.cadc.reg", Level.DEBUG);
Log4jInit.setLevel("ca.nrc.cadc.caom2.repo.client", Level.INFO);
}

// @Test
Expand All @@ -110,17 +109,13 @@ public void testTemplate() {
@Test
public void testGetObservationListCADC() {
try {
Subject s = AuthenticationUtil.getSubject(new NetrcAuthenticator(true));
Subject.doAs(s, new PrivilegedExceptionAction<Object>() {

@Override
public Object run() throws Exception {
RepoClient repoC = new RepoClient(URI.create("ivo://cadc.nrc.ca/ams"), 8);
Subject s = AuthenticationUtil.getAnonSubject();
Subject.doAs(s, (PrivilegedExceptionAction<Object>) () -> {
RepoClient repoC = new RepoClient(URI.create("ivo://cadc.nrc.ca/ams"), 8);

List<ObservationState> list = repoC.getObservationList("IRIS", null, null, 5);
Assert.assertEquals(6, list.size());
return null;
}
List<ObservationState> list = repoC.getObservationList("IRIS", null, null, 5);
Assert.assertEquals(6, list.size());
return null;
});
} catch (Exception unexpected) {
log.error("unexpected exception", unexpected);
Expand All @@ -132,17 +127,13 @@ public Object run() throws Exception {
public void testGetObservationListMAST() {
try {
Subject s = AuthenticationUtil.getAnonSubject();
Subject.doAs(s, new PrivilegedExceptionAction<Object>() {

@Override
public Object run() throws Exception {
RepoClient repoC = new RepoClient(URI.create("ivo://mast.stsci.edu/caom2repo"), 8);
Subject.doAs(s, (PrivilegedExceptionAction<Object>) () -> {
RepoClient repoC = new RepoClient(URI.create("ivo://mast.stsci.edu/caom2repo"), 8);

List<ObservationState> list = repoC.getObservationList("HST", null, null, 5);
Assert.assertEquals(6, list.size());
List<ObservationState> list = repoC.getObservationList("HST", null, null, 5);
Assert.assertEquals(6, list.size());

return null;
}
return null;
});
} catch (Exception unexpected) {
log.error("unexpected exception", unexpected);
Expand All @@ -154,17 +145,13 @@ public Object run() throws Exception {
public void testGetObservationListDenied() {
try {
Subject s = AuthenticationUtil.getAnonSubject();
Subject.doAs(s, new PrivilegedExceptionAction<Object>() {
Subject.doAs(s, (PrivilegedExceptionAction<Object>) () -> {
RepoClient repoC = new RepoClient(URI.create("ivo://cadc.nrc.ca/ams"), 8);

@Override
public Object run() throws Exception {
RepoClient repoC = new RepoClient(URI.create("ivo://cadc.nrc.ca/ams"), 8);
List<ObservationState> list = repoC.getObservationList("DAO", null, null, 5);
Assert.fail("expected exception, got results");

List<ObservationState> list = repoC.getObservationList("DAO", null, null, 5);
Assert.fail("expected exception, got results");

return null;
}
return null;
});
} catch (AccessControlException expected) {
log.info("caught expected exception: " + expected);
Expand All @@ -177,20 +164,16 @@ public Object run() throws Exception {
@Test
public void testGet() {
try {
Subject s = AuthenticationUtil.getSubject(new NetrcAuthenticator(true));
Subject.doAs(s, new PrivilegedExceptionAction<Object>() {

@Override
public Object run() throws Exception {
RepoClient repoC = new RepoClient(URI.create("ivo://cadc.nrc.ca/ams"), 8);
Subject s = AuthenticationUtil.getAnonSubject();
Subject.doAs(s, (PrivilegedExceptionAction<Object>) () -> {
RepoClient repoC = new RepoClient(URI.create("ivo://cadc.nrc.ca/ams"), 8);

ObservationResponse wr = repoC.get(new ObservationURI("IRIS", "f001h000"));
Assert.assertNotNull(wr);
Assert.assertNotNull(wr.observation);
Assert.assertFalse(wr.observation.getPlanes().isEmpty());
ObservationResponse wr = repoC.get(new ObservationURI("IRIS", "f001h000"));
Assert.assertNotNull(wr);
Assert.assertNotNull(wr.observation);
Assert.assertFalse(wr.observation.getPlanes().isEmpty());

return null;
}
return null;
});
} catch (Exception unexpected) {
log.error("unexpected exception", unexpected);
Expand Down
123 changes: 53 additions & 70 deletions caom2-repo/src/main/java/ca/nrc/cadc/caom2/repo/client/RepoClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@

import ca.nrc.cadc.auth.AuthMethod;
import ca.nrc.cadc.auth.AuthenticationUtil;
import ca.nrc.cadc.auth.NotAuthenticatedException;
import ca.nrc.cadc.caom2.DeletedObservation;
import ca.nrc.cadc.caom2.ObservationResponse;
import ca.nrc.cadc.caom2.ObservationState;
Expand All @@ -79,17 +80,20 @@
import ca.nrc.cadc.caom2.repo.client.transform.DeletionListReader;
import ca.nrc.cadc.caom2.repo.client.transform.ObservationStateListReader;
import ca.nrc.cadc.date.DateUtil;
import ca.nrc.cadc.net.HttpDownload;
import ca.nrc.cadc.net.ExpectationFailedException;
import ca.nrc.cadc.net.HttpGet;
import ca.nrc.cadc.net.InputStreamWrapper;
import ca.nrc.cadc.net.PreconditionFailedException;
import ca.nrc.cadc.net.RangeNotSatisfiableException;
import ca.nrc.cadc.net.ResourceAlreadyExistsException;
import ca.nrc.cadc.net.ResourceNotFoundException;
import ca.nrc.cadc.net.TransientException;
import ca.nrc.cadc.reg.Capabilities;
import ca.nrc.cadc.reg.CapabilitiesReader;
import ca.nrc.cadc.reg.Capability;
import ca.nrc.cadc.reg.Interface;
import ca.nrc.cadc.reg.Standards;
import ca.nrc.cadc.reg.client.RegistryClient;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
Expand Down Expand Up @@ -436,17 +440,17 @@ private List<ObservationState> readObservationStateList(ObservationStateListRead
if (tooBigRequest) {
rec = DEFAULT_BATCH_SIZE;
}
// Use HttpDownload to make the http GET calls (because it handles a lot
// Use HttpGet to make the http GET calls (because it handles a lot
// of the
// authentication stuff)
boolean go = true;
String surlCommon = baseServiceURL.toExternalForm() + File.separator + collection;

while (go) {
ByteArrayOutputStream bos = new ByteArrayOutputStream();
if (!tooBigRequest) {
go = false;// only one go
}

String surl = surlCommon;
surl = surl + "?maxrec=" + (rec + 1);
if (start != null) {
Expand All @@ -455,43 +459,33 @@ private List<ObservationState> readObservationStateList(ObservationStateListRead
if (end != null) {
surl = surl + "&end=" + df.format(end);
}
URL url;
log.debug("URL: " + surl);

URL url;
try {
url = new URL(surl);
HttpDownload get = new HttpDownload(url, bos);
get.setFollowRedirects(true);

get.run();
int responseCode = get.getResponseCode();
log.debug("RESPONSE CODE: '" + responseCode + "'");

// if (responseCode == 302) {
// // redirected url
// url = get.getRedirectURL();
// log.debug("REDIRECTED URL: " + url);
// bos = new ByteArrayOutputStream();
// get = new HttpDownload(url, bos);
// get.run();
// responseCode = get.getResponseCode();
// log.debug("RESPONSE CODE (REDIRECTED URL): '" + responseCode
// + "'");
//
// }

if (get.getThrowable() != null) {
if (get.getThrowable() instanceof AccessControlException) {
throw (AccessControlException) get.getThrowable();
}
throw new RuntimeException("failed to get observation list", get.getThrowable());
}
} catch (MalformedURLException e) {
throw new RuntimeException("BUG: failed to generate observation list url", e);
}

HttpGet get = new HttpGet(url, true);
try {
// log.debug("RESPONSE = '" + bos.toString() + "'");
List<ObservationState> partialList = transformer.read(new ByteArrayInputStream(bos.toByteArray()));
get.prepare();
} catch (AccessControlException | NotAuthenticatedException e) {
throw new AccessControlException(e.getMessage());
} catch (TransientException | IOException | ResourceNotFoundException
| ResourceAlreadyExistsException e) {
throw new TransientException(e.getMessage());
} catch (ExpectationFailedException | IllegalArgumentException
| PreconditionFailedException | RangeNotSatisfiableException
| InterruptedException e) {
throw new RuntimeException(e.getMessage(), e);
} finally {
log.debug(String.format("RESPONSE CODE: '%s'", get.getResponseCode()));
}

try {
List<ObservationState> partialList = transformer.read(get.getInputStream());
if (partialList != null && !partialList.isEmpty() && !accList.isEmpty() && accList.get(accList.size() - 1).equals(partialList.get(0))) {
partialList.remove(0);
}
Expand All @@ -501,8 +495,6 @@ private List<ObservationState> readObservationStateList(ObservationStateListRead
log.debug("adding " + partialList.size() + " elements to accList. Now there are " + accList.size());
}

bos.close();

if (accList.size() > 0) {
start = accList.get(accList.size() - 1).maxLastModified;
}
Expand All @@ -527,14 +519,14 @@ private List<ObservationState> readObservationStateList(ObservationStateListRead
}
}
} catch (ParseException | URISyntaxException | IOException e) {
throw new RuntimeException("Unable to list of ObservationState from " + bos.toString(), e);
throw new RuntimeException("Unable to list of ObservationState from " + surl, e);
}
}
return accList;

}

// pdowler: was going to use this so the HttpDownload would pass the
// pdowler: was going to use this so the HttpGet would pass the
// InputStream directly to the reader
// but will take additional refactoring
private class StreamingListReader<T> implements InputStreamWrapper {
Expand Down Expand Up @@ -570,17 +562,17 @@ private List<DeletedObservation> readDeletedEntityList(DeletionListReader transf
if (tooBigRequest) {
rec = DEFAULT_BATCH_SIZE;
}
// Use HttpDownload to make the http GET calls (because it handles a lot
// Use HttpGet to make the http GET calls (because it handles a lot
// of the
// authentication stuff)
boolean go = true;
String surlCommon = baseDeletionURL.toExternalForm() + File.separator + collection;

while (go) {
ByteArrayOutputStream bos = new ByteArrayOutputStream();
if (!tooBigRequest) {
go = false;// only one go
}

String surl = surlCommon;
surl = surl + "?maxRec=" + (rec + 1);
if (start != null) {
Expand All @@ -589,41 +581,34 @@ private List<DeletedObservation> readDeletedEntityList(DeletionListReader transf
if (end != null) {
surl = surl + "&end=" + df.format(end);
}
URL url;
log.debug("URL: " + surl);

URL url;
try {
url = new URL(surl);
HttpDownload get = new HttpDownload(url, bos);
get.setFollowRedirects(true);

get.run();
int responseCode = get.getResponseCode();
log.debug("RESPONSE CODE: '" + responseCode + "'");

if (responseCode == 302) {
// redirected url
url = get.getRedirectURL();
log.debug("REDIRECTED URL: " + url);
bos = new ByteArrayOutputStream();
get = new HttpDownload(url, bos);
responseCode = get.getResponseCode();
log.debug("RESPONSE CODE (REDIRECTED URL): '" + responseCode + "'");

}

if (get.getThrowable() != null) {
if (get.getThrowable() instanceof AccessControlException) {
throw (AccessControlException) get.getThrowable();
}
throw new RuntimeException("failed to get observation list", get.getThrowable());
}
} catch (MalformedURLException e) {
throw new RuntimeException("BUG: failed to generate observation list url", e);
}

HttpGet get = new HttpGet(url, true);

try {
get.prepare();
} catch (AccessControlException | NotAuthenticatedException e) {
throw new AccessControlException(e.getMessage());
} catch (TransientException | IOException | ResourceNotFoundException
| ResourceAlreadyExistsException e) {
throw new TransientException(e.getMessage());
} catch (ExpectationFailedException | IllegalArgumentException
| PreconditionFailedException | RangeNotSatisfiableException
| InterruptedException e) {
throw new RuntimeException(e.getMessage(), e);
} finally {
log.debug(String.format("RESPONSE CODE: '%s'", get.getResponseCode()));
}

try {
// log.debug("RESPONSE = '" + bos.toString() + "'");
partialList = transformer.read(new ByteArrayInputStream(bos.toByteArray()));
partialList = transformer.read(get.getInputStream());
// partialList =
// transformByteArrayOutputStreamIntoListOfObservationState(bos,
// df, '\t', '\n');
Expand All @@ -635,10 +620,8 @@ private List<DeletedObservation> readDeletedEntityList(DeletionListReader transf
accList.addAll(partialList);
log.debug("adding " + partialList.size() + " elements to accList. Now there are " + accList.size());
}

bos.close();
} catch (ParseException | URISyntaxException | IOException e) {
throw new RuntimeException("Unable to list of ObservationState from " + bos.toString(), e);
throw new RuntimeException("Unable to list of ObservationState from " + surl, e);
}

if (accList.size() > 0) {
Expand Down
2 changes: 1 addition & 1 deletion icewind/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ dependencies {
implementation 'org.opencadc:cadc-util:[1.6,2.0)'
implementation 'org.opencadc:caom2:[2.4.4,2.5)'
implementation 'org.opencadc:caom2persistence:[2.4.14,2.5)'
implementation 'org.opencadc:caom2-repo:[1.4,1.5)'
implementation 'org.opencadc:caom2-repo:[1.4.5,1.5)'

// needed for validation
implementation 'org.opencadc:caom2-compute:[2.4.6,2.5)'
Expand Down
18 changes: 12 additions & 6 deletions icewind/src/main/java/org/opencadc/icewind/CaomHarvester.java
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@
import ca.nrc.cadc.caom2.version.InitDatabase;
import ca.nrc.cadc.db.ConnectionConfig;
import ca.nrc.cadc.db.DBUtil;
import ca.nrc.cadc.net.TransientException;
import java.io.File;
import java.net.URI;
import java.util.Date;
Expand Down Expand Up @@ -187,14 +188,19 @@ public void run() {
initDel = (hs.curID == null && hs.curLastModified == null); // never harvested
}

// delete observations before harvest to avoid observationURI conflicts from delete+create
obsDeleter.setInitHarvestState(initDel);
obsDeleter.run();
try {
// delete observations before harvest to avoid observationURI conflicts from delete+create
obsDeleter.setInitHarvestState(initDel);
obsDeleter.run();

// harvest observations
obsHarvester.run();
ingested += obsHarvester.getIngested();
// harvest observations
obsHarvester.run();
ingested += obsHarvester.getIngested();
} catch (TransientException e) {
ingested = 0;
}
}

if (this.exitWhenComplete) {
log.info("exitWhenComplete=" + exitWhenComplete + ": DONE");
done = true;
Expand Down
Loading
Loading