Skip to content

Commit

Permalink
introducing [elton stream] command to enable streaming of interaction…
Browse files Browse the repository at this point in the history
…s from a preston archive; related to Big-Bee-Network/bif#1; fyi @zedomel @seltmann
  • Loading branch information
Jorrit Poelen committed Jul 26, 2024
1 parent 7f9999a commit 9534e4b
Show file tree
Hide file tree
Showing 6 changed files with 257 additions and 73 deletions.
2 changes: 2 additions & 0 deletions src/main/java/org/globalbioticinteractions/elton/Elton.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
*/

import org.apache.commons.lang.StringUtils;
import org.globalbioticinteractions.elton.cmd.CmdStream;
import org.globalbioticinteractions.elton.cmd.CmdDatasets;
import org.globalbioticinteractions.elton.cmd.CmdGet;
import org.globalbioticinteractions.elton.cmd.CmdInit;
Expand Down Expand Up @@ -37,6 +38,7 @@
CmdVersion.class,
CmdNanoPubs.class,
CmdList.class,
CmdStream.class,
CmdUpdate.class,
CmdInstallManual.class,
ManPageGenerator.class,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@
)
public class CmdInteractions extends CmdTabularWriterParams {

public class TsvWriter implements InteractionWriter, TabularWriter {
public static class TsvWriter implements InteractionWriter, TabularWriter {
private final PrintStream out;

TsvWriter(PrintStream out) {
Expand All @@ -92,6 +92,12 @@ public class TsvWriter implements InteractionWriter, TabularWriter {

@Override
public void write(SpecimenImpl source, InteractType type, SpecimenImpl target, Study study, Dataset dataset) {
Stream<String> valueStream = getValues(source, type, target, study, dataset);
String row = StreamUtil.tsvRowOf(valueStream);
out.println(row);
}

private static Stream<String> getValues(SpecimenImpl source, InteractType type, SpecimenImpl target, Study study, Dataset dataset) {
Stream<String> interactStream = Stream.of(type.getIRI(), type.getLabel());

String sourceOccurrenceId = valueOrEmpty(source, OCCURRENCE_ID);
Expand All @@ -106,7 +112,7 @@ public void write(SpecimenImpl source, InteractType type, SpecimenImpl target, S
String targetCollectionId = valueOrEmpty(target, COLLECTION_ID);
String targetInstitutionCode = valueOrEmpty(target, INSTITUTION_CODE);

Stream<String> rowStream = Stream.of(
return Stream.of(
Stream.of(source.isSupportingClaim() ? PropertyAndValueDictionary.SUPPORTS : PropertyAndValueDictionary.REFUTES),
Stream.of(sourceOccurrenceId, sourceCatalogNumber, sourceCollectionCode, sourceCollectionId, sourceInstitutionCode),
StreamUtil.streamOf(source.taxon),
Expand All @@ -120,67 +126,70 @@ public void write(SpecimenImpl source, InteractType type, SpecimenImpl target, S
StreamUtil.streamOf(target.getSampleLocation()),
StreamUtil.streamOf(study),
CmdUtil.datasetInfo(dataset).stream()).flatMap(x -> x);
String row = StreamUtil.tsvRowOf(rowStream);
out.println(row);
}

private String valueOrEmpty(SpecimenImpl source, String key) {
private static String valueOrEmpty(SpecimenImpl source, String key) {
String value = source.getProperty(key);
return StringUtils.isBlank(value) ? "" : value;
}

@Override
public void writeHeader() {
out.println(StreamUtil.tsvRowOf(Stream.concat(Stream.of(
ARGUMENT_TYPE_ID,
SOURCE_OCCURRENCE_ID,
SOURCE_CATALOG_NUMBER,
SOURCE_COLLECTION_CODE,
SOURCE_COLLECTION_ID,
SOURCE_INSTITUTION_CODE,
SOURCE_TAXON_ID,
SOURCE_TAXON_NAME,
SOURCE_TAXON_RANK,
SOURCE_TAXON_PATH_IDS,
SOURCE_TAXON_PATH,
SOURCE_TAXON_PATH_NAMES,
SOURCE_BODY_PART_ID,
SOURCE_BODY_PART_NAME,
SOURCE_LIFE_STAGE_ID,
SOURCE_LIFE_STAGE_NAME,
SOURCE_SEX_ID,
SOURCE_SEX_NAME,
INTERACTION_TYPE_ID,
INTERACTION_TYPE_NAME,
TARGET_OCCURRENCE_ID,
TARGET_CATALOG_NUMBER,
TARGET_COLLECTION_CODE,
TARGET_COLLECTION_ID,
TARGET_INSTITUTION_CODE,
TARGET_TAXON_ID,
TARGET_TAXON_NAME,
TARGET_TAXON_RANK,
TARGET_TAXON_PATH_IDS,
TARGET_TAXON_PATH,
TARGET_TAXON_PATH_NAMES,
TARGET_BODY_PART_ID,
TARGET_BODY_PART_NAME,
TARGET_LIFE_STAGE_ID,
TARGET_LIFE_STAGE_NAME,
TARGET_SEX_ID,
TARGET_SEX_NAME,
BASIS_OF_RECORD_ID,
BASIS_OF_RECORD_NAME,
EVENT_DATE,
DECIMAL_LATITUDE,
DECIMAL_LONGITUDE,
LOCALITY_ID,
LOCALITY_NAME,
REFERENCE_DOI,
REFERENCE_URL,
REFERENCE_CITATION

), StreamUtil.datasetHeaderFields())));
Stream<String> keys = getKeys();
out.println(StreamUtil.tsvRowOf(keys));
}

private static Stream<String> getKeys() {
return Stream.concat(Stream.of(
ARGUMENT_TYPE_ID,
SOURCE_OCCURRENCE_ID,
SOURCE_CATALOG_NUMBER,
SOURCE_COLLECTION_CODE,
SOURCE_COLLECTION_ID,
SOURCE_INSTITUTION_CODE,
SOURCE_TAXON_ID,
SOURCE_TAXON_NAME,
SOURCE_TAXON_RANK,
SOURCE_TAXON_PATH_IDS,
SOURCE_TAXON_PATH,
SOURCE_TAXON_PATH_NAMES,
SOURCE_BODY_PART_ID,
SOURCE_BODY_PART_NAME,
SOURCE_LIFE_STAGE_ID,
SOURCE_LIFE_STAGE_NAME,
SOURCE_SEX_ID,
SOURCE_SEX_NAME,
INTERACTION_TYPE_ID,
INTERACTION_TYPE_NAME,
TARGET_OCCURRENCE_ID,
TARGET_CATALOG_NUMBER,
TARGET_COLLECTION_CODE,
TARGET_COLLECTION_ID,
TARGET_INSTITUTION_CODE,
TARGET_TAXON_ID,
TARGET_TAXON_NAME,
TARGET_TAXON_RANK,
TARGET_TAXON_PATH_IDS,
TARGET_TAXON_PATH,
TARGET_TAXON_PATH_NAMES,
TARGET_BODY_PART_ID,
TARGET_BODY_PART_NAME,
TARGET_LIFE_STAGE_ID,
TARGET_LIFE_STAGE_NAME,
TARGET_SEX_ID,
TARGET_SEX_NAME,
BASIS_OF_RECORD_ID,
BASIS_OF_RECORD_NAME,
EVENT_DATE,
DECIMAL_LATITUDE,
DECIMAL_LONGITUDE,
LOCALITY_ID,
LOCALITY_NAME,
REFERENCE_DOI,
REFERENCE_URL,
REFERENCE_CITATION

), StreamUtil.datasetHeaderFields());
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
package org.globalbioticinteractions.elton.cmd;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import picocli.CommandLine;

import java.io.BufferedReader;
import java.io.File;
import java.io.IOException;
import java.io.InputStreamReader;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.atomic.AtomicBoolean;

@CommandLine.Command(
name = "stream",
description = "stream interactions associated with dataset configuration provided by globi.json line-json as input.\n" +
"example input:" +
"{ \"namespace\": \"hash://sha256/9cd053d40ef148e16389982ea16d724063b82567f7ba1799962670fc97876fbf\", \"citation\": \"hash://sha256/9cd053d40ef148e16389982ea16d724063b82567f7ba1799962670fc97876fbf\", \"format\": \"dwca\", \"url\": \"https://linker.bio/hash://sha256/9cd053d40ef148e16389982ea16d724063b82567f7ba1799962670fc97876fbf\" }\n"
)

public class CmdStream extends CmdDefaultParams {

private final static Logger LOG = LoggerFactory.getLogger(CmdStream.class);


@Override
public void run() {

BufferedReader reader = IOUtils.buffer(new InputStreamReader(getStdin(), StandardCharsets.UTF_8));
AtomicBoolean isFirst = new AtomicBoolean(true);
try {
String line;
while ((line = reader.readLine()) != null) {
try {
JsonNode jsonNode = new ObjectMapper().readTree(line);
if (jsonNode.has("namespace")) {
String namespace = jsonNode.get("namespace").asText();
if (StringUtils.isNotBlank(namespace)) {
try {
StreamingNamespaceConfigHandler namespaceHandler = new StreamingNamespaceConfigHandler(
jsonNode,
this.createInputStreamFactory(),
this.getCacheDir(),
this.getStderr(),
this.getStdout()
);
namespaceHandler.setShouldWriteHeader(isFirst.get());
namespaceHandler.onNamespace(namespace);
isFirst.set(false);
} catch (Exception e) {
LOG.error("failed to add dataset associated with namespace [" + namespace + "]", e);
} finally {
FileUtils.forceDelete(new File(this.getCacheDir()));
}
}
}
} catch (JsonProcessingException e) {
// ignore non-json lines
}
}
} catch (IOException ex) {
LOG.error("failed to read from stdin", ex);
}

}

}
30 changes: 18 additions & 12 deletions src/main/java/org/globalbioticinteractions/elton/cmd/CmdUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import org.eol.globi.util.ResourceServiceLocal;
import org.eol.globi.util.ResourceServiceLocalAndRemote;
import org.globalbioticinteractions.cache.Cache;
import org.globalbioticinteractions.cache.CacheFactory;
import org.globalbioticinteractions.cache.CacheLocalReadonly;
import org.globalbioticinteractions.cache.CacheProxy;
import org.globalbioticinteractions.dataset.Dataset;
Expand Down Expand Up @@ -61,19 +62,24 @@ static DatasetRegistry createDataFinderLoggingCaching(
String namespace,
String cacheDir,
InputStreamFactory factory) {
return new DatasetRegistryWithCache(new DatasetRegistryLogger(registry, cacheDir), dataset -> {
ResourceService remote = new ResourceServiceLocalAndRemote(factory);
ResourceService local = new ResourceServiceLocal(factory);
Cache pullThroughCache = new CachePullThroughPrestonStore(namespace, cacheDir, remote, new StatementListener() {
CacheFactory cacheFactory = createCacheFactory(namespace, cacheDir, factory);
return new DatasetRegistryWithCache(new DatasetRegistryLogger(registry, cacheDir), cacheFactory);
}

@Override
public void on(Quad quad) {
// ignore printing quads for now
}
});
CacheLocalReadonly readOnlyCache = new CacheLocalReadonly(namespace, cacheDir, local);
return new CacheProxy(Arrays.asList(pullThroughCache, readOnlyCache));
});
public static CacheFactory createCacheFactory(String namespace, String cacheDir, InputStreamFactory factory) {
return dataset -> {
ResourceService remote = new ResourceServiceLocalAndRemote(factory);
ResourceService local = new ResourceServiceLocal(factory);
Cache pullThroughCache = new CachePullThroughPrestonStore(namespace, cacheDir, remote, new StatementListener() {

@Override
public void on(Quad quad) {
// ignore printing quads for now
}
});
CacheLocalReadonly readOnlyCache = new CacheLocalReadonly(namespace, cacheDir, local);
return new CacheProxy(Arrays.asList(pullThroughCache, readOnlyCache));
};
}

public static List<String> datasetInfo(Dataset dataset) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
package org.globalbioticinteractions.elton.cmd;

import com.fasterxml.jackson.databind.JsonNode;
import org.apache.commons.io.IOUtils;
import org.eol.globi.data.NodeFactory;
import org.eol.globi.data.StudyImporterException;
import org.eol.globi.util.DatasetImportUtil;
import org.eol.globi.util.InputStreamFactory;
import org.globalbioticinteractions.cache.Cache;
import org.globalbioticinteractions.cache.CacheFactory;
import org.globalbioticinteractions.dataset.Dataset;
import org.globalbioticinteractions.dataset.DatasetWithCache;
import org.globalbioticinteractions.dataset.DatasetWithResourceMapping;
import org.globalbioticinteractions.elton.util.DatasetProcessorForTSV;
import org.globalbioticinteractions.elton.util.NamespaceHandler;
import org.globalbioticinteractions.elton.util.NodeFactoryForDataset;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.PrintStream;
import java.net.URI;
import java.nio.charset.StandardCharsets;

class StreamingNamespaceConfigHandler implements NamespaceHandler {
private final static Logger LOG = LoggerFactory.getLogger(StreamingNamespaceConfigHandler.class);
private final String cacheDir;
private final PrintStream stderr;
private final PrintStream stdout;

private InputStreamFactory factory;
private final JsonNode config;
private boolean shouldWriteHeader;

public StreamingNamespaceConfigHandler(JsonNode jsonNode,
InputStreamFactoryLogging inputStreamFactory,
String cacheDir,
PrintStream stderr,
PrintStream stdout) {
this.factory = inputStreamFactory;
this.cacheDir = cacheDir;
this.stderr = stderr;
this.stdout = stdout;
this.config = jsonNode;
}

@Override
public void onNamespace(String namespace) throws Exception {
stderr.println("tracking [" + namespace + "]...");
CacheFactory cacheFactory = CmdUtil.createCacheFactory(
namespace,
cacheDir,
factory
);

Dataset dataset = new DatasetWithResourceMapping(
namespace,
URI.create(config.get("url").asText()),
cacheFactory.cacheFor(null)
);
dataset.setConfig(config);
Cache cache = cacheFactory.cacheFor(dataset);
DatasetWithCache datasetWithCache = new DatasetWithCache(dataset, cache);

CmdInteractions.TsvWriter writer = new CmdInteractions.TsvWriter(stdout);
if (shouldWriteHeader) {
writer.writeHeader();
}

NodeFactory factory = new NodeFactoryForDataset(writer, new DatasetProcessorForTSV());

factory.getOrCreateDataset(dataset);
try {
DatasetImportUtil.importDataset(
null,
datasetWithCache,
factory,
null);
stderr.println("done.");
} catch (StudyImporterException ex) {
LOG.error("tracking of [" + namespace + "] failed.", ex);
stderr.println("failed with [ " + ex.getMessage() + "].");
ex.printStackTrace(stderr);
}

IOUtils.write("wrote [" + namespace + "]\n", stderr, StandardCharsets.UTF_8);
}

public void setShouldWriteHeader(boolean shouldWriteHeader) {
this.shouldWriteHeader = shouldWriteHeader;
}
}
Loading

0 comments on commit 9534e4b

Please sign in to comment.