From 6a196d6c0773baa59b77a64bcbd3f174c49e7593 Mon Sep 17 00:00:00 2001 From: Jeff Burke Date: Fri, 18 Aug 2023 10:51:21 -0700 Subject: [PATCH] CADC-12730 - update RepoClient to catch network errors and throw TransientException and CaomHarvester to catch the TransientException and continue processing. --- caom2-repo/build.gradle | 2 +- .../caom2/repo/client/RepoClientTest.java | 67 ++++------ .../cadc/caom2/repo/client/RepoClient.java | 123 ++++++++---------- icewind/VERSION | 2 +- icewind/build.gradle | 2 +- .../org/opencadc/icewind/CaomHarvester.java | 18 ++- .../opencadc/icewind/DeletionHarvester.java | 3 +- .../icewind/ObservationHarvester.java | 8 +- 8 files changed, 98 insertions(+), 127 deletions(-) diff --git a/caom2-repo/build.gradle b/caom2-repo/build.gradle index 3663d2dd..51d18990 100644 --- a/caom2-repo/build.gradle +++ b/caom2-repo/build.gradle @@ -17,7 +17,7 @@ sourceCompatibility = 1.8 group = 'org.opencadc' -version = '1.4.4' +version = '1.4.5' description = 'OpenCADC CAOM repository client library' def git_url = 'https://github.com/opencadc/caom2db' diff --git a/caom2-repo/src/intTest/java/ca/nrc/cadc/caom2/repo/client/RepoClientTest.java b/caom2-repo/src/intTest/java/ca/nrc/cadc/caom2/repo/client/RepoClientTest.java index 4990d342..cb4ee1b7 100644 --- a/caom2-repo/src/intTest/java/ca/nrc/cadc/caom2/repo/client/RepoClientTest.java +++ b/caom2-repo/src/intTest/java/ca/nrc/cadc/caom2/repo/client/RepoClientTest.java @@ -93,8 +93,7 @@ public class RepoClientTest { private static final Logger log = Logger.getLogger(RepoClientTest.class); static { - Log4jInit.setLevel("ca.nrc.cadc.caom2.repo.client", Level.DEBUG); - // Log4jInit.setLevel("ca.nrc.cadc.reg", Level.DEBUG); + Log4jInit.setLevel("ca.nrc.cadc.caom2.repo.client", Level.INFO); } // @Test @@ -110,17 +109,13 @@ public void testTemplate() { @Test public void testGetObservationListCADC() { try { - Subject s = AuthenticationUtil.getSubject(new NetrcAuthenticator(true)); - Subject.doAs(s, new PrivilegedExceptionAction() { - - @Override - public Object run() throws Exception { - RepoClient repoC = new RepoClient(URI.create("ivo://cadc.nrc.ca/ams"), 8); + Subject s = AuthenticationUtil.getAnonSubject(); + Subject.doAs(s, (PrivilegedExceptionAction) () -> { + RepoClient repoC = new RepoClient(URI.create("ivo://cadc.nrc.ca/ams"), 8); - List list = repoC.getObservationList("IRIS", null, null, 5); - Assert.assertEquals(6, list.size()); - return null; - } + List list = repoC.getObservationList("IRIS", null, null, 5); + Assert.assertEquals(6, list.size()); + return null; }); } catch (Exception unexpected) { log.error("unexpected exception", unexpected); @@ -132,17 +127,13 @@ public Object run() throws Exception { public void testGetObservationListMAST() { try { Subject s = AuthenticationUtil.getAnonSubject(); - Subject.doAs(s, new PrivilegedExceptionAction() { - - @Override - public Object run() throws Exception { - RepoClient repoC = new RepoClient(URI.create("ivo://mast.stsci.edu/caom2repo"), 8); + Subject.doAs(s, (PrivilegedExceptionAction) () -> { + RepoClient repoC = new RepoClient(URI.create("ivo://mast.stsci.edu/caom2repo"), 8); - List list = repoC.getObservationList("HST", null, null, 5); - Assert.assertEquals(6, list.size()); + List list = repoC.getObservationList("HST", null, null, 5); + Assert.assertEquals(6, list.size()); - return null; - } + return null; }); } catch (Exception unexpected) { log.error("unexpected exception", unexpected); @@ -154,17 +145,13 @@ public Object run() throws Exception { public void testGetObservationListDenied() { try { Subject s = AuthenticationUtil.getAnonSubject(); - Subject.doAs(s, new PrivilegedExceptionAction() { + Subject.doAs(s, (PrivilegedExceptionAction) () -> { + RepoClient repoC = new RepoClient(URI.create("ivo://cadc.nrc.ca/ams"), 8); - @Override - public Object run() throws Exception { - RepoClient repoC = new RepoClient(URI.create("ivo://cadc.nrc.ca/ams"), 8); + List list = repoC.getObservationList("DAO", null, null, 5); + Assert.fail("expected exception, got results"); - List list = repoC.getObservationList("DAO", null, null, 5); - Assert.fail("expected exception, got results"); - - return null; - } + return null; }); } catch (AccessControlException expected) { log.info("caught expected exception: " + expected); @@ -177,20 +164,16 @@ public Object run() throws Exception { @Test public void testGet() { try { - Subject s = AuthenticationUtil.getSubject(new NetrcAuthenticator(true)); - Subject.doAs(s, new PrivilegedExceptionAction() { - - @Override - public Object run() throws Exception { - RepoClient repoC = new RepoClient(URI.create("ivo://cadc.nrc.ca/ams"), 8); + Subject s = AuthenticationUtil.getAnonSubject(); + Subject.doAs(s, (PrivilegedExceptionAction) () -> { + RepoClient repoC = new RepoClient(URI.create("ivo://cadc.nrc.ca/ams"), 8); - ObservationResponse wr = repoC.get(new ObservationURI("IRIS", "f001h000")); - Assert.assertNotNull(wr); - Assert.assertNotNull(wr.observation); - Assert.assertFalse(wr.observation.getPlanes().isEmpty()); + ObservationResponse wr = repoC.get(new ObservationURI("IRIS", "f001h000")); + Assert.assertNotNull(wr); + Assert.assertNotNull(wr.observation); + Assert.assertFalse(wr.observation.getPlanes().isEmpty()); - return null; - } + return null; }); } catch (Exception unexpected) { log.error("unexpected exception", unexpected); diff --git a/caom2-repo/src/main/java/ca/nrc/cadc/caom2/repo/client/RepoClient.java b/caom2-repo/src/main/java/ca/nrc/cadc/caom2/repo/client/RepoClient.java index de647b89..43198888 100644 --- a/caom2-repo/src/main/java/ca/nrc/cadc/caom2/repo/client/RepoClient.java +++ b/caom2-repo/src/main/java/ca/nrc/cadc/caom2/repo/client/RepoClient.java @@ -71,6 +71,7 @@ import ca.nrc.cadc.auth.AuthMethod; import ca.nrc.cadc.auth.AuthenticationUtil; +import ca.nrc.cadc.auth.NotAuthenticatedException; import ca.nrc.cadc.caom2.DeletedObservation; import ca.nrc.cadc.caom2.ObservationResponse; import ca.nrc.cadc.caom2.ObservationState; @@ -79,17 +80,20 @@ import ca.nrc.cadc.caom2.repo.client.transform.DeletionListReader; import ca.nrc.cadc.caom2.repo.client.transform.ObservationStateListReader; import ca.nrc.cadc.date.DateUtil; -import ca.nrc.cadc.net.HttpDownload; +import ca.nrc.cadc.net.ExpectationFailedException; +import ca.nrc.cadc.net.HttpGet; import ca.nrc.cadc.net.InputStreamWrapper; +import ca.nrc.cadc.net.PreconditionFailedException; +import ca.nrc.cadc.net.RangeNotSatisfiableException; +import ca.nrc.cadc.net.ResourceAlreadyExistsException; import ca.nrc.cadc.net.ResourceNotFoundException; +import ca.nrc.cadc.net.TransientException; import ca.nrc.cadc.reg.Capabilities; import ca.nrc.cadc.reg.CapabilitiesReader; import ca.nrc.cadc.reg.Capability; import ca.nrc.cadc.reg.Interface; import ca.nrc.cadc.reg.Standards; import ca.nrc.cadc.reg.client.RegistryClient; -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; import java.io.File; import java.io.IOException; import java.io.InputStream; @@ -436,17 +440,17 @@ private List readObservationStateList(ObservationStateListRead if (tooBigRequest) { rec = DEFAULT_BATCH_SIZE; } - // Use HttpDownload to make the http GET calls (because it handles a lot + // Use HttpGet to make the http GET calls (because it handles a lot // of the // authentication stuff) boolean go = true; String surlCommon = baseServiceURL.toExternalForm() + File.separator + collection; while (go) { - ByteArrayOutputStream bos = new ByteArrayOutputStream(); if (!tooBigRequest) { go = false;// only one go } + String surl = surlCommon; surl = surl + "?maxrec=" + (rec + 1); if (start != null) { @@ -455,43 +459,33 @@ private List readObservationStateList(ObservationStateListRead if (end != null) { surl = surl + "&end=" + df.format(end); } - URL url; log.debug("URL: " + surl); + + URL url; try { url = new URL(surl); - HttpDownload get = new HttpDownload(url, bos); - get.setFollowRedirects(true); - - get.run(); - int responseCode = get.getResponseCode(); - log.debug("RESPONSE CODE: '" + responseCode + "'"); - - // if (responseCode == 302) { - // // redirected url - // url = get.getRedirectURL(); - // log.debug("REDIRECTED URL: " + url); - // bos = new ByteArrayOutputStream(); - // get = new HttpDownload(url, bos); - // get.run(); - // responseCode = get.getResponseCode(); - // log.debug("RESPONSE CODE (REDIRECTED URL): '" + responseCode - // + "'"); - // - // } - - if (get.getThrowable() != null) { - if (get.getThrowable() instanceof AccessControlException) { - throw (AccessControlException) get.getThrowable(); - } - throw new RuntimeException("failed to get observation list", get.getThrowable()); - } } catch (MalformedURLException e) { throw new RuntimeException("BUG: failed to generate observation list url", e); } + HttpGet get = new HttpGet(url, true); try { - // log.debug("RESPONSE = '" + bos.toString() + "'"); - List partialList = transformer.read(new ByteArrayInputStream(bos.toByteArray())); + get.prepare(); + } catch (AccessControlException | NotAuthenticatedException e) { + throw new AccessControlException(e.getMessage()); + } catch (TransientException | IOException | ResourceNotFoundException + | ResourceAlreadyExistsException e) { + throw new TransientException(e.getMessage()); + } catch (ExpectationFailedException | IllegalArgumentException + | PreconditionFailedException | RangeNotSatisfiableException + | InterruptedException e) { + throw new RuntimeException(e.getMessage(), e); + } finally { + log.debug(String.format("RESPONSE CODE: '%s'", get.getResponseCode())); + } + + try { + List partialList = transformer.read(get.getInputStream()); if (partialList != null && !partialList.isEmpty() && !accList.isEmpty() && accList.get(accList.size() - 1).equals(partialList.get(0))) { partialList.remove(0); } @@ -501,8 +495,6 @@ private List readObservationStateList(ObservationStateListRead log.debug("adding " + partialList.size() + " elements to accList. Now there are " + accList.size()); } - bos.close(); - if (accList.size() > 0) { start = accList.get(accList.size() - 1).maxLastModified; } @@ -527,14 +519,14 @@ private List readObservationStateList(ObservationStateListRead } } } catch (ParseException | URISyntaxException | IOException e) { - throw new RuntimeException("Unable to list of ObservationState from " + bos.toString(), e); + throw new RuntimeException("Unable to list of ObservationState from " + surl, e); } } return accList; } - // pdowler: was going to use this so the HttpDownload would pass the + // pdowler: was going to use this so the HttpGet would pass the // InputStream directly to the reader // but will take additional refactoring private class StreamingListReader implements InputStreamWrapper { @@ -570,17 +562,17 @@ private List readDeletedEntityList(DeletionListReader transf if (tooBigRequest) { rec = DEFAULT_BATCH_SIZE; } - // Use HttpDownload to make the http GET calls (because it handles a lot + // Use HttpGet to make the http GET calls (because it handles a lot // of the // authentication stuff) boolean go = true; String surlCommon = baseDeletionURL.toExternalForm() + File.separator + collection; while (go) { - ByteArrayOutputStream bos = new ByteArrayOutputStream(); if (!tooBigRequest) { go = false;// only one go } + String surl = surlCommon; surl = surl + "?maxRec=" + (rec + 1); if (start != null) { @@ -589,41 +581,34 @@ private List readDeletedEntityList(DeletionListReader transf if (end != null) { surl = surl + "&end=" + df.format(end); } - URL url; log.debug("URL: " + surl); + + URL url; try { url = new URL(surl); - HttpDownload get = new HttpDownload(url, bos); - get.setFollowRedirects(true); - - get.run(); - int responseCode = get.getResponseCode(); - log.debug("RESPONSE CODE: '" + responseCode + "'"); - - if (responseCode == 302) { - // redirected url - url = get.getRedirectURL(); - log.debug("REDIRECTED URL: " + url); - bos = new ByteArrayOutputStream(); - get = new HttpDownload(url, bos); - responseCode = get.getResponseCode(); - log.debug("RESPONSE CODE (REDIRECTED URL): '" + responseCode + "'"); - - } - - if (get.getThrowable() != null) { - if (get.getThrowable() instanceof AccessControlException) { - throw (AccessControlException) get.getThrowable(); - } - throw new RuntimeException("failed to get observation list", get.getThrowable()); - } } catch (MalformedURLException e) { throw new RuntimeException("BUG: failed to generate observation list url", e); } + HttpGet get = new HttpGet(url, true); + + try { + get.prepare(); + } catch (AccessControlException | NotAuthenticatedException e) { + throw new AccessControlException(e.getMessage()); + } catch (TransientException | IOException | ResourceNotFoundException + | ResourceAlreadyExistsException e) { + throw new TransientException(e.getMessage()); + } catch (ExpectationFailedException | IllegalArgumentException + | PreconditionFailedException | RangeNotSatisfiableException + | InterruptedException e) { + throw new RuntimeException(e.getMessage(), e); + } finally { + log.debug(String.format("RESPONSE CODE: '%s'", get.getResponseCode())); + } + try { - // log.debug("RESPONSE = '" + bos.toString() + "'"); - partialList = transformer.read(new ByteArrayInputStream(bos.toByteArray())); + partialList = transformer.read(get.getInputStream()); // partialList = // transformByteArrayOutputStreamIntoListOfObservationState(bos, // df, '\t', '\n'); @@ -635,10 +620,8 @@ private List readDeletedEntityList(DeletionListReader transf accList.addAll(partialList); log.debug("adding " + partialList.size() + " elements to accList. Now there are " + accList.size()); } - - bos.close(); } catch (ParseException | URISyntaxException | IOException e) { - throw new RuntimeException("Unable to list of ObservationState from " + bos.toString(), e); + throw new RuntimeException("Unable to list of ObservationState from " + surl, e); } if (accList.size() > 0) { diff --git a/icewind/VERSION b/icewind/VERSION index f26bb800..348557d7 100644 --- a/icewind/VERSION +++ b/icewind/VERSION @@ -1,6 +1,6 @@ ## deployable containers have a semantic and build tag # semantic version tag: major.minor[.patch] # build version tag: timestamp -VER=0.9.2 +VER=0.9.3 TAGS="${VER} ${VER}-$(date --utc +"%Y%m%dT%H%M%S")" unset VER diff --git a/icewind/build.gradle b/icewind/build.gradle index a5025a0b..5becb5f4 100644 --- a/icewind/build.gradle +++ b/icewind/build.gradle @@ -20,7 +20,7 @@ dependencies { implementation 'org.opencadc:cadc-util:[1.6,2.0)' implementation 'org.opencadc:caom2:[2.4.4,2.5)' implementation 'org.opencadc:caom2persistence:[2.4.14,2.5)' - implementation 'org.opencadc:caom2-repo:[1.4,1.5)' + implementation 'org.opencadc:caom2-repo:[1.4.5,1.5)' // needed for validation implementation 'org.opencadc:caom2-compute:[2.4.6,2.5)' diff --git a/icewind/src/main/java/org/opencadc/icewind/CaomHarvester.java b/icewind/src/main/java/org/opencadc/icewind/CaomHarvester.java index de233d8c..064c8901 100644 --- a/icewind/src/main/java/org/opencadc/icewind/CaomHarvester.java +++ b/icewind/src/main/java/org/opencadc/icewind/CaomHarvester.java @@ -74,6 +74,7 @@ import ca.nrc.cadc.caom2.version.InitDatabase; import ca.nrc.cadc.db.ConnectionConfig; import ca.nrc.cadc.db.DBUtil; +import ca.nrc.cadc.net.TransientException; import java.io.File; import java.net.URI; import java.util.Date; @@ -187,14 +188,19 @@ public void run() { initDel = (hs.curID == null && hs.curLastModified == null); // never harvested } - // delete observations before harvest to avoid observationURI conflicts from delete+create - obsDeleter.setInitHarvestState(initDel); - obsDeleter.run(); + try { + // delete observations before harvest to avoid observationURI conflicts from delete+create + obsDeleter.setInitHarvestState(initDel); + obsDeleter.run(); - // harvest observations - obsHarvester.run(); - ingested += obsHarvester.getIngested(); + // harvest observations + obsHarvester.run(); + ingested += obsHarvester.getIngested(); + } catch (TransientException e) { + ingested = 0; + } } + if (this.exitWhenComplete) { log.info("exitWhenComplete=" + exitWhenComplete + ": DONE"); done = true; diff --git a/icewind/src/main/java/org/opencadc/icewind/DeletionHarvester.java b/icewind/src/main/java/org/opencadc/icewind/DeletionHarvester.java index e4dce860..47baf37d 100644 --- a/icewind/src/main/java/org/opencadc/icewind/DeletionHarvester.java +++ b/icewind/src/main/java/org/opencadc/icewind/DeletionHarvester.java @@ -358,9 +358,10 @@ private Progress doit() { } } } catch (Throwable t) { + // catch to log the exception, then rethrow. log.error("unexpected exception", t); - ret.abort = true; correct = false; + throw t; } finally { if (correct) { log.debug("DONE"); diff --git a/icewind/src/main/java/org/opencadc/icewind/ObservationHarvester.java b/icewind/src/main/java/org/opencadc/icewind/ObservationHarvester.java index 8116e9a1..c08fc414 100644 --- a/icewind/src/main/java/org/opencadc/icewind/ObservationHarvester.java +++ b/icewind/src/main/java/org/opencadc/icewind/ObservationHarvester.java @@ -86,9 +86,6 @@ import ca.nrc.cadc.db.DBUtil; import ca.nrc.cadc.net.ResourceNotFoundException; import ca.nrc.cadc.net.TransientException; -import java.io.File; -import java.io.FileReader; -import java.io.IOException; import java.net.URI; import java.security.MessageDigest; import java.security.NoSuchAlgorithmException; @@ -97,11 +94,9 @@ import java.util.Collections; import java.util.Comparator; import java.util.Date; -import java.util.HashMap; import java.util.List; import java.util.ListIterator; import java.util.Map; -import java.util.Properties; import java.util.UUID; import java.util.concurrent.ExecutionException; import javax.naming.NamingException; @@ -568,6 +563,9 @@ private Progress doit() { } catch (InterruptedException | ExecutionException e) { log.error("SEVERE PROBLEM - ThreadPool harvesting Observations failed: " + e.getMessage()); ret.abort = true; + } catch (Throwable th) { + log.error("unexpected exception", th); + throw th; } finally { timeTransaction = System.currentTimeMillis() - t; log.debug("time to get HarvestState: " + timeState + "ms");