diff --git a/src/main/java/org/globalbioticinteractions/elton/Elton.java b/src/main/java/org/globalbioticinteractions/elton/Elton.java index 5e76d1f..5543621 100644 --- a/src/main/java/org/globalbioticinteractions/elton/Elton.java +++ b/src/main/java/org/globalbioticinteractions/elton/Elton.java @@ -5,6 +5,7 @@ */ import org.apache.commons.lang.StringUtils; +import org.globalbioticinteractions.elton.cmd.CmdStream; import org.globalbioticinteractions.elton.cmd.CmdDatasets; import org.globalbioticinteractions.elton.cmd.CmdGet; import org.globalbioticinteractions.elton.cmd.CmdInit; @@ -37,6 +38,7 @@ CmdVersion.class, CmdNanoPubs.class, CmdList.class, + CmdStream.class, CmdUpdate.class, CmdInstallManual.class, ManPageGenerator.class, diff --git a/src/main/java/org/globalbioticinteractions/elton/cmd/CmdInteractions.java b/src/main/java/org/globalbioticinteractions/elton/cmd/CmdInteractions.java index 6fb4a1a..31d9862 100644 --- a/src/main/java/org/globalbioticinteractions/elton/cmd/CmdInteractions.java +++ b/src/main/java/org/globalbioticinteractions/elton/cmd/CmdInteractions.java @@ -83,7 +83,7 @@ ) public class CmdInteractions extends CmdTabularWriterParams { - public class TsvWriter implements InteractionWriter, TabularWriter { + public static class TsvWriter implements InteractionWriter, TabularWriter { private final PrintStream out; TsvWriter(PrintStream out) { @@ -92,6 +92,12 @@ public class TsvWriter implements InteractionWriter, TabularWriter { @Override public void write(SpecimenImpl source, InteractType type, SpecimenImpl target, Study study, Dataset dataset) { + Stream valueStream = getValues(source, type, target, study, dataset); + String row = StreamUtil.tsvRowOf(valueStream); + out.println(row); + } + + private static Stream getValues(SpecimenImpl source, InteractType type, SpecimenImpl target, Study study, Dataset dataset) { Stream interactStream = Stream.of(type.getIRI(), type.getLabel()); String sourceOccurrenceId = valueOrEmpty(source, OCCURRENCE_ID); @@ -106,7 +112,7 @@ public void write(SpecimenImpl source, InteractType type, SpecimenImpl target, S String targetCollectionId = valueOrEmpty(target, COLLECTION_ID); String targetInstitutionCode = valueOrEmpty(target, INSTITUTION_CODE); - Stream rowStream = Stream.of( + return Stream.of( Stream.of(source.isSupportingClaim() ? PropertyAndValueDictionary.SUPPORTS : PropertyAndValueDictionary.REFUTES), Stream.of(sourceOccurrenceId, sourceCatalogNumber, sourceCollectionCode, sourceCollectionId, sourceInstitutionCode), StreamUtil.streamOf(source.taxon), @@ -120,67 +126,70 @@ public void write(SpecimenImpl source, InteractType type, SpecimenImpl target, S StreamUtil.streamOf(target.getSampleLocation()), StreamUtil.streamOf(study), CmdUtil.datasetInfo(dataset).stream()).flatMap(x -> x); - String row = StreamUtil.tsvRowOf(rowStream); - out.println(row); } - private String valueOrEmpty(SpecimenImpl source, String key) { + private static String valueOrEmpty(SpecimenImpl source, String key) { String value = source.getProperty(key); return StringUtils.isBlank(value) ? "" : value; } @Override public void writeHeader() { - out.println(StreamUtil.tsvRowOf(Stream.concat(Stream.of( - ARGUMENT_TYPE_ID, - SOURCE_OCCURRENCE_ID, - SOURCE_CATALOG_NUMBER, - SOURCE_COLLECTION_CODE, - SOURCE_COLLECTION_ID, - SOURCE_INSTITUTION_CODE, - SOURCE_TAXON_ID, - SOURCE_TAXON_NAME, - SOURCE_TAXON_RANK, - SOURCE_TAXON_PATH_IDS, - SOURCE_TAXON_PATH, - SOURCE_TAXON_PATH_NAMES, - SOURCE_BODY_PART_ID, - SOURCE_BODY_PART_NAME, - SOURCE_LIFE_STAGE_ID, - SOURCE_LIFE_STAGE_NAME, - SOURCE_SEX_ID, - SOURCE_SEX_NAME, - INTERACTION_TYPE_ID, - INTERACTION_TYPE_NAME, - TARGET_OCCURRENCE_ID, - TARGET_CATALOG_NUMBER, - TARGET_COLLECTION_CODE, - TARGET_COLLECTION_ID, - TARGET_INSTITUTION_CODE, - TARGET_TAXON_ID, - TARGET_TAXON_NAME, - TARGET_TAXON_RANK, - TARGET_TAXON_PATH_IDS, - TARGET_TAXON_PATH, - TARGET_TAXON_PATH_NAMES, - TARGET_BODY_PART_ID, - TARGET_BODY_PART_NAME, - TARGET_LIFE_STAGE_ID, - TARGET_LIFE_STAGE_NAME, - TARGET_SEX_ID, - TARGET_SEX_NAME, - BASIS_OF_RECORD_ID, - BASIS_OF_RECORD_NAME, - EVENT_DATE, - DECIMAL_LATITUDE, - DECIMAL_LONGITUDE, - LOCALITY_ID, - LOCALITY_NAME, - REFERENCE_DOI, - REFERENCE_URL, - REFERENCE_CITATION - - ), StreamUtil.datasetHeaderFields()))); + Stream keys = getKeys(); + out.println(StreamUtil.tsvRowOf(keys)); + } + + private static Stream getKeys() { + return Stream.concat(Stream.of( + ARGUMENT_TYPE_ID, + SOURCE_OCCURRENCE_ID, + SOURCE_CATALOG_NUMBER, + SOURCE_COLLECTION_CODE, + SOURCE_COLLECTION_ID, + SOURCE_INSTITUTION_CODE, + SOURCE_TAXON_ID, + SOURCE_TAXON_NAME, + SOURCE_TAXON_RANK, + SOURCE_TAXON_PATH_IDS, + SOURCE_TAXON_PATH, + SOURCE_TAXON_PATH_NAMES, + SOURCE_BODY_PART_ID, + SOURCE_BODY_PART_NAME, + SOURCE_LIFE_STAGE_ID, + SOURCE_LIFE_STAGE_NAME, + SOURCE_SEX_ID, + SOURCE_SEX_NAME, + INTERACTION_TYPE_ID, + INTERACTION_TYPE_NAME, + TARGET_OCCURRENCE_ID, + TARGET_CATALOG_NUMBER, + TARGET_COLLECTION_CODE, + TARGET_COLLECTION_ID, + TARGET_INSTITUTION_CODE, + TARGET_TAXON_ID, + TARGET_TAXON_NAME, + TARGET_TAXON_RANK, + TARGET_TAXON_PATH_IDS, + TARGET_TAXON_PATH, + TARGET_TAXON_PATH_NAMES, + TARGET_BODY_PART_ID, + TARGET_BODY_PART_NAME, + TARGET_LIFE_STAGE_ID, + TARGET_LIFE_STAGE_NAME, + TARGET_SEX_ID, + TARGET_SEX_NAME, + BASIS_OF_RECORD_ID, + BASIS_OF_RECORD_NAME, + EVENT_DATE, + DECIMAL_LATITUDE, + DECIMAL_LONGITUDE, + LOCALITY_ID, + LOCALITY_NAME, + REFERENCE_DOI, + REFERENCE_URL, + REFERENCE_CITATION + + ), StreamUtil.datasetHeaderFields()); } } diff --git a/src/main/java/org/globalbioticinteractions/elton/cmd/CmdStream.java b/src/main/java/org/globalbioticinteractions/elton/cmd/CmdStream.java new file mode 100644 index 0000000..912ff03 --- /dev/null +++ b/src/main/java/org/globalbioticinteractions/elton/cmd/CmdStream.java @@ -0,0 +1,73 @@ +package org.globalbioticinteractions.elton.cmd; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.commons.io.FileUtils; +import org.apache.commons.io.IOUtils; +import org.apache.commons.lang.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import picocli.CommandLine; + +import java.io.BufferedReader; +import java.io.File; +import java.io.IOException; +import java.io.InputStreamReader; +import java.nio.charset.StandardCharsets; +import java.util.concurrent.atomic.AtomicBoolean; + +@CommandLine.Command( + name = "stream", + description = "stream interactions associated with dataset configuration provided by globi.json line-json as input.\n" + + "example input:" + + "{ \"namespace\": \"hash://sha256/9cd053d40ef148e16389982ea16d724063b82567f7ba1799962670fc97876fbf\", \"citation\": \"hash://sha256/9cd053d40ef148e16389982ea16d724063b82567f7ba1799962670fc97876fbf\", \"format\": \"dwca\", \"url\": \"https://linker.bio/hash://sha256/9cd053d40ef148e16389982ea16d724063b82567f7ba1799962670fc97876fbf\" }\n" +) + +public class CmdStream extends CmdDefaultParams { + + private final static Logger LOG = LoggerFactory.getLogger(CmdStream.class); + + + @Override + public void run() { + + BufferedReader reader = IOUtils.buffer(new InputStreamReader(getStdin(), StandardCharsets.UTF_8)); + AtomicBoolean isFirst = new AtomicBoolean(true); + try { + String line; + while ((line = reader.readLine()) != null) { + try { + JsonNode jsonNode = new ObjectMapper().readTree(line); + if (jsonNode.has("namespace")) { + String namespace = jsonNode.get("namespace").asText(); + if (StringUtils.isNotBlank(namespace)) { + try { + StreamingNamespaceConfigHandler namespaceHandler = new StreamingNamespaceConfigHandler( + jsonNode, + this.createInputStreamFactory(), + this.getCacheDir(), + this.getStderr(), + this.getStdout() + ); + namespaceHandler.setShouldWriteHeader(isFirst.get()); + namespaceHandler.onNamespace(namespace); + isFirst.set(false); + } catch (Exception e) { + LOG.error("failed to add dataset associated with namespace [" + namespace + "]", e); + } finally { + FileUtils.forceDelete(new File(this.getCacheDir())); + } + } + } + } catch (JsonProcessingException e) { + // ignore non-json lines + } + } + } catch (IOException ex) { + LOG.error("failed to read from stdin", ex); + } + + } + +} diff --git a/src/main/java/org/globalbioticinteractions/elton/cmd/CmdUtil.java b/src/main/java/org/globalbioticinteractions/elton/cmd/CmdUtil.java index a70cc86..6f5c882 100644 --- a/src/main/java/org/globalbioticinteractions/elton/cmd/CmdUtil.java +++ b/src/main/java/org/globalbioticinteractions/elton/cmd/CmdUtil.java @@ -14,6 +14,7 @@ import org.eol.globi.util.ResourceServiceLocal; import org.eol.globi.util.ResourceServiceLocalAndRemote; import org.globalbioticinteractions.cache.Cache; +import org.globalbioticinteractions.cache.CacheFactory; import org.globalbioticinteractions.cache.CacheLocalReadonly; import org.globalbioticinteractions.cache.CacheProxy; import org.globalbioticinteractions.dataset.Dataset; @@ -61,19 +62,24 @@ static DatasetRegistry createDataFinderLoggingCaching( String namespace, String cacheDir, InputStreamFactory factory) { - return new DatasetRegistryWithCache(new DatasetRegistryLogger(registry, cacheDir), dataset -> { - ResourceService remote = new ResourceServiceLocalAndRemote(factory); - ResourceService local = new ResourceServiceLocal(factory); - Cache pullThroughCache = new CachePullThroughPrestonStore(namespace, cacheDir, remote, new StatementListener() { + CacheFactory cacheFactory = createCacheFactory(namespace, cacheDir, factory); + return new DatasetRegistryWithCache(new DatasetRegistryLogger(registry, cacheDir), cacheFactory); + } - @Override - public void on(Quad quad) { - // ignore printing quads for now - } - }); - CacheLocalReadonly readOnlyCache = new CacheLocalReadonly(namespace, cacheDir, local); - return new CacheProxy(Arrays.asList(pullThroughCache, readOnlyCache)); - }); + public static CacheFactory createCacheFactory(String namespace, String cacheDir, InputStreamFactory factory) { + return dataset -> { + ResourceService remote = new ResourceServiceLocalAndRemote(factory); + ResourceService local = new ResourceServiceLocal(factory); + Cache pullThroughCache = new CachePullThroughPrestonStore(namespace, cacheDir, remote, new StatementListener() { + + @Override + public void on(Quad quad) { + // ignore printing quads for now + } + }); + CacheLocalReadonly readOnlyCache = new CacheLocalReadonly(namespace, cacheDir, local); + return new CacheProxy(Arrays.asList(pullThroughCache, readOnlyCache)); + }; } public static List datasetInfo(Dataset dataset) { diff --git a/src/main/java/org/globalbioticinteractions/elton/cmd/StreamingNamespaceConfigHandler.java b/src/main/java/org/globalbioticinteractions/elton/cmd/StreamingNamespaceConfigHandler.java new file mode 100644 index 0000000..8e17700 --- /dev/null +++ b/src/main/java/org/globalbioticinteractions/elton/cmd/StreamingNamespaceConfigHandler.java @@ -0,0 +1,91 @@ +package org.globalbioticinteractions.elton.cmd; + +import com.fasterxml.jackson.databind.JsonNode; +import org.apache.commons.io.IOUtils; +import org.eol.globi.data.NodeFactory; +import org.eol.globi.data.StudyImporterException; +import org.eol.globi.util.DatasetImportUtil; +import org.eol.globi.util.InputStreamFactory; +import org.globalbioticinteractions.cache.Cache; +import org.globalbioticinteractions.cache.CacheFactory; +import org.globalbioticinteractions.dataset.Dataset; +import org.globalbioticinteractions.dataset.DatasetWithCache; +import org.globalbioticinteractions.dataset.DatasetWithResourceMapping; +import org.globalbioticinteractions.elton.util.DatasetProcessorForTSV; +import org.globalbioticinteractions.elton.util.NamespaceHandler; +import org.globalbioticinteractions.elton.util.NodeFactoryForDataset; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.PrintStream; +import java.net.URI; +import java.nio.charset.StandardCharsets; + +class StreamingNamespaceConfigHandler implements NamespaceHandler { + private final static Logger LOG = LoggerFactory.getLogger(StreamingNamespaceConfigHandler.class); + private final String cacheDir; + private final PrintStream stderr; + private final PrintStream stdout; + + private InputStreamFactory factory; + private final JsonNode config; + private boolean shouldWriteHeader; + + public StreamingNamespaceConfigHandler(JsonNode jsonNode, + InputStreamFactoryLogging inputStreamFactory, + String cacheDir, + PrintStream stderr, + PrintStream stdout) { + this.factory = inputStreamFactory; + this.cacheDir = cacheDir; + this.stderr = stderr; + this.stdout = stdout; + this.config = jsonNode; + } + + @Override + public void onNamespace(String namespace) throws Exception { + stderr.println("tracking [" + namespace + "]..."); + CacheFactory cacheFactory = CmdUtil.createCacheFactory( + namespace, + cacheDir, + factory + ); + + Dataset dataset = new DatasetWithResourceMapping( + namespace, + URI.create(config.get("url").asText()), + cacheFactory.cacheFor(null) + ); + dataset.setConfig(config); + Cache cache = cacheFactory.cacheFor(dataset); + DatasetWithCache datasetWithCache = new DatasetWithCache(dataset, cache); + + CmdInteractions.TsvWriter writer = new CmdInteractions.TsvWriter(stdout); + if (shouldWriteHeader) { + writer.writeHeader(); + } + + NodeFactory factory = new NodeFactoryForDataset(writer, new DatasetProcessorForTSV()); + + factory.getOrCreateDataset(dataset); + try { + DatasetImportUtil.importDataset( + null, + datasetWithCache, + factory, + null); + stderr.println("done."); + } catch (StudyImporterException ex) { + LOG.error("tracking of [" + namespace + "] failed.", ex); + stderr.println("failed with [ " + ex.getMessage() + "]."); + ex.printStackTrace(stderr); + } + + IOUtils.write("wrote [" + namespace + "]\n", stderr, StandardCharsets.UTF_8); + } + + public void setShouldWriteHeader(boolean shouldWriteHeader) { + this.shouldWriteHeader = shouldWriteHeader; + } +} diff --git a/src/main/java/org/globalbioticinteractions/elton/store/CachePullThroughPrestonStore.java b/src/main/java/org/globalbioticinteractions/elton/store/CachePullThroughPrestonStore.java index 68396d0..60dee49 100644 --- a/src/main/java/org/globalbioticinteractions/elton/store/CachePullThroughPrestonStore.java +++ b/src/main/java/org/globalbioticinteractions/elton/store/CachePullThroughPrestonStore.java @@ -80,12 +80,15 @@ public InputStream retrieve(URI resourceURI) throws IOException { DereferencerContentAddressed derefCas = new DereferencerContentAddressed(deref, blobStore); - - IRI dereferenced = derefCas.get(RefNodeFactory.toIRI(resourceURI)); - - streamProvenance(resourceURI, dereferenced, listener); - recordProvenance(resourceURI, keyToPath, dereferenced); - return blobStore.get(dereferenced); + try { + IRI dereferenced = derefCas.get(RefNodeFactory.toIRI(resourceURI)); + + streamProvenance(resourceURI, dereferenced, listener); + recordProvenance(resourceURI, keyToPath, dereferenced); + return blobStore.get(dereferenced); + } catch (IOException ex) { + throw new IOException("failed to retrieve [" + resourceURI + "]", ex); + } } private void streamProvenance(URI resourceURI, IRI dereferenced, StatementListener statementListener) {