Skip to content

Commit

Permalink
Improve existence-check batching and resolution usability (#72)
Browse files Browse the repository at this point in the history
  • Loading branch information
pradh authored Sep 17, 2021
1 parent d0f17a9 commit f384cbc
Show file tree
Hide file tree
Showing 16 changed files with 148 additions and 112 deletions.
2 changes: 1 addition & 1 deletion tool/src/main/java/org/datacommons/tool/GenMcf.java
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ public Integer call() throws IOException {
}
Processor.Args args = new Processor.Args();
args.doExistenceChecks = parent.doExistenceChecks;
args.doResolution = parent.doResolution;
args.resolutionMode = parent.resolutionMode;
args.verbose = parent.verbose;
args.fileGroup = FileGroup.build(files, spec, delimiter, logger);
args.logCtx = new LogWrapper(Debug.Log.newBuilder(), parent.outputDir.toPath());
Expand Down
2 changes: 1 addition & 1 deletion tool/src/main/java/org/datacommons/tool/Lint.java
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ public Integer call() throws IOException {
}
Processor.Args args = new Processor.Args();
args.doExistenceChecks = parent.doExistenceChecks;
args.doResolution = parent.doResolution;
args.resolutionMode = parent.resolutionMode;
args.verbose = parent.verbose;
args.fileGroup = FileGroup.build(files, spec, delimiter, logger);
args.logCtx = new LogWrapper(Debug.Log.newBuilder(), parent.outputDir.toPath());
Expand Down
18 changes: 12 additions & 6 deletions tool/src/main/java/org/datacommons/tool/Main.java
Original file line number Diff line number Diff line change
Expand Up @@ -47,18 +47,24 @@ class Main {
description =
"Check DCID references to schema nodes against the KG and locally. If set, then "
+ "calls will be made to the Staging API server, and instance MCFs get fully "
+ "loaded into memory.")
+ "loaded into memory. Defaults to true.")
public boolean doExistenceChecks;

// TODO: Default to true after some trials.
// TODO: Default to LOCAL after some trials.
@CommandLine.Option(
names = {"-r", "--resolution"},
defaultValue = "false",
defaultValue = "NONE",
scope = CommandLine.ScopeType.INHERIT,
description = "Resolves local references and generates node DCIDs.")
public boolean doResolution;
description =
"Specifies the mode of resolution to use: ${COMPLETION-CANDIDATES}. For no resolution,"
+ " set NONE. To lookup external IDs (like ISO) in DC, resolve local references "
+ "and generated DCIDs, set FULL. To just resolve local references and generate "
+ "DCIDs, set LOCAL. Note that FULL mode may be slower since it makes "
+ "(batched) DC Recon API calls and two passes over your CSV files. Default to NONE.")
public Processor.ResolutionMode resolutionMode = Processor.ResolutionMode.NONE;

public static void main(String... args) {
System.exit(new CommandLine(new Main()).execute(args));
System.exit(
new CommandLine(new Main()).setCaseInsensitiveEnumValuesAllowed(true).execute(args));
}
}
90 changes: 68 additions & 22 deletions tool/src/main/java/org/datacommons/tool/Processor.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ public class Processor {
// Set only for "genmcf"
private final Map<OutputFileType, Path> outputFiles;
private ExistenceChecker existenceChecker;
private ResolutionMode resolutionMode;
private ExternalIdResolver idResolver;
private final Map<OutputFileType, BufferedWriter> writers = new HashMap<>();
private final List<Mcf.McfGraph> nodesForVariousChecks = new ArrayList<>();
Expand All @@ -47,18 +48,24 @@ public class Processor {
// different things. Input MCFs have schema/StatVars while TMCF/CSVs have stats.
public enum OutputFileType {
// Output MCF where *.mcf inputs get resolved into.
NODES,
INSTANCE_MCF_NODES,
// Output MCF where failed nodes from *.mcf inputs flow into.
FAILED_NODES,
FAILED_INSTANCE_MCF_NODES,
// Output MCF where TMCF/CSV inputs get resolved into.
TABLE_NODES,
TABLE_MCF_NODES,
// Output MCF where TMCF/CSV failed nodes flow into.
FAILED_TABLE_NODES,
FAILED_TABLE_MCF_NODES,
};

public enum ResolutionMode {
NONE,
LOCAL,
FULL,
};

static class Args {
public boolean doExistenceChecks = false;
public boolean doResolution = false;
public ResolutionMode resolutionMode = ResolutionMode.NONE;
public boolean verbose = false;
public FileGroup fileGroup = null;
public Map<OutputFileType, Path> outputFiles = null;
Expand All @@ -71,39 +78,57 @@ public static Integer process(Args args) throws IOException {
Processor processor = new Processor(args);

// Load all the instance MCFs into memory, so we can do existence checks, resolution, etc.
if (args.doExistenceChecks) {
logger.info("Loading Instance MCF files into memory");
} else {
logger.info("Loading and Checking Instance MCF files");
}
processor.processNodes(Mcf.McfType.INSTANCE_MCF);

// Perform existence checks.
if (args.doExistenceChecks) {
logger.info("Checking Instance MCF nodes (with Existence checks)");
// NOTE: If doExistenceChecks is true, we do a checkNodes() call *after* all instance MCF
// files are processed (via processNodes). This is so that the newly added schema, StatVar,
// etc. are known to the Existence Checker first, before existence checks are performed.
processor.checkNodes();
}

if (args.doResolution) {
// Find external IDs from in-memory MCF nodes and CSVs, and map them to DCIDs.
processor.lookupExternalIds();
if (args.resolutionMode != ResolutionMode.NONE) {
if (args.resolutionMode == ResolutionMode.FULL) {
// Find external IDs from in-memory MCF nodes and CSVs, and map them to DCIDs.
processor.lookupExternalIds();
}

// Having looked up the external IDs, resolve the instances.
logger.info("Resolving Instance MCF nodes");
processor.resolveNodes();

// Resolution for table nodes will happen inside processTables().
}

if (!args.fileGroup.getCsvs().isEmpty()) {
// Process all the tables.
if (args.resolutionMode == ResolutionMode.FULL) {
logger.info("Re-loading, Checking and Resolving Table MCF files");
} else if (args.resolutionMode == ResolutionMode.LOCAL) {
logger.info("Loading, Checking and Resolving Table MCF files");
} else {
logger.info("Loading and Checking Table MCF files");
}
processor.processTables();
} else if (args.fileGroup.getTmcfs() != null) {
// Sanity check the TMCF nodes.
logger.info("Loading and Checking Template MCF files");
processor.processNodes(Mcf.McfType.TEMPLATE_MCF);
}

if (args.outputFiles != null) {
processor.closeFiles();
}
} catch (DCTooManyFailuresException | InterruptedException | IOException ex) {
// Regardless of the failures, we will dump the logCtx and exit.
} catch (DCTooManyFailuresException | InterruptedException ex) {
// Only for DCTooManyFailuresException, we will dump the logCtx and exit.
logger.error("Aborting prematurely, see report.json.");
retVal = -1;
}
args.logCtx.persistLog(false);
Expand All @@ -115,10 +140,11 @@ private Processor(Args args) {
this.outputFiles = args.outputFiles;
this.verbose = args.verbose;
this.fileGroup = args.fileGroup;
this.resolutionMode = args.resolutionMode;
if (args.doExistenceChecks) {
existenceChecker = new ExistenceChecker(HttpClient.newHttpClient(), verbose, logCtx);
}
if (args.doResolution) {
if (resolutionMode == ResolutionMode.FULL) {
idResolver = new ExternalIdResolver(HttpClient.newHttpClient(), verbose, logCtx);
}
}
Expand Down Expand Up @@ -154,7 +180,7 @@ private void processNodes(Mcf.McfType type, File file)
} else {
McfChecker.check(n, existenceChecker, logCtx);
}
if (existenceChecker != null || idResolver != null) {
if (existenceChecker != null || resolutionMode != ResolutionMode.NONE) {
nodesForVariousChecks.add(n);
}

Expand Down Expand Up @@ -190,11 +216,11 @@ private void processTables()
if (success) {
logCtx.incrementCounterBy("NumRowSuccesses", 1);
}
if (idResolver != null) {
resolveCommon(g, OutputFileType.TABLE_NODES, OutputFileType.FAILED_TABLE_NODES);
if (resolutionMode != ResolutionMode.NONE) {
resolveCommon(g, OutputFileType.TABLE_MCF_NODES, OutputFileType.FAILED_TABLE_MCF_NODES);
} else {
if (outputFiles != null) {
writeGraph(OutputFileType.TABLE_NODES, g);
writeGraph(OutputFileType.TABLE_MCF_NODES, g);
}
}
numNodesProcessed += g.getNodesCount();
Expand All @@ -205,7 +231,10 @@ private void processTables()
}
}
logger.info(
"Checked CSV {} ({} rows, {} nodes)",
"Checked "
+ (resolutionMode != ResolutionMode.NONE ? "and Resolved " : "")
+ "CSV {} ({}"
+ " rows, {} nodes)",
csvFile.getName(),
numRowsProcessed,
numNodesProcessed);
Expand All @@ -216,7 +245,6 @@ private void processTables()
// Called only when existenceChecker is enabled.
private void checkNodes() throws IOException, InterruptedException, DCTooManyFailuresException {
long numNodesChecked = 0;
logger.info("Performing existence checks");
logCtx.setLocationFile("");
for (Mcf.McfGraph n : nodesForVariousChecks) {
McfChecker.check(n, existenceChecker, logCtx);
Expand All @@ -234,8 +262,8 @@ private void checkNodes() throws IOException, InterruptedException, DCTooManyFai
private void resolveNodes() throws IOException {
resolveCommon(
McfUtil.mergeGraphs(nodesForVariousChecks),
OutputFileType.NODES,
OutputFileType.FAILED_NODES);
OutputFileType.INSTANCE_MCF_NODES,
OutputFileType.FAILED_INSTANCE_MCF_NODES);
}

private void resolveCommon(
Expand All @@ -256,13 +284,25 @@ private void resolveCommon(
}

// Process all the CSV tables to load all external IDs.
private void lookupExternalIds() throws IOException, InterruptedException {
LogWrapper dummyLog = new LogWrapper(Debug.Log.newBuilder(), Path.of("."));
private void lookupExternalIds()
throws IOException, InterruptedException, DCTooManyFailuresException {
LogWrapper dummyLog = new LogWrapper(Debug.Log.newBuilder());
logger.info("Processing External IDs from Instance MCF nodes");
long numNodesProcessed = 0;
for (var g : nodesForVariousChecks) {
for (var idAndNode : g.getNodesMap().entrySet()) {
idResolver.submitNode(idAndNode.getValue());
}
numNodesProcessed += g.getNodesCount();
dummyLog.provideStatus(numNodesProcessed, "nodes processed");
if (dummyLog.loggedTooManyFailures()) {
System.err.println("Too Many Errors ::\n" + dummyLog.dumpLog());
throw new DCTooManyFailuresException("encountered too many failures");
}
}

logger.info("Loading and Processing External IDs from Table MCF files");
long numRowsProcessed = 0;
for (File csvFile : fileGroup.getCsvs()) {
if (verbose) logger.info("Reading external IDs from CSV " + csvFile.getPath());
TmcfCsvParser parser =
Expand All @@ -274,6 +314,12 @@ private void lookupExternalIds() throws IOException, InterruptedException {
for (var idAndNode : g.getNodesMap().entrySet()) {
idResolver.submitNode(idAndNode.getValue());
}
numRowsProcessed++;
dummyLog.provideStatus(numRowsProcessed, "rows processed");
if (dummyLog.loggedTooManyFailures()) {
System.err.println("Too Many Errors ::\n" + dummyLog.dumpLog());
throw new DCTooManyFailuresException("encountered too many failures");
}
}
}
idResolver.drainRemoteCalls();
Expand All @@ -283,7 +329,7 @@ private void writeGraph(OutputFileType type, Mcf.McfGraph graph) throws IOExcept
var writer = writers.getOrDefault(type, null);
if (writer == null) {
var fileString = outputFiles.get(type).toString();
logger.info("Writing to file " + fileString + " of type " + type.name());
logger.info("Opening output file " + fileString + " of type " + type.name());
writer = new BufferedWriter(new FileWriter(fileString));
writers.put(type, writer);
}
Expand Down
2 changes: 1 addition & 1 deletion tool/src/test/java/org/datacommons/tool/GenMcfTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ public void GenMcfTest() throws IOException {
for (File inputFile : inputFiles) {
argsList.add(inputFile.getPath());
}
argsList.add("--resolution");
argsList.add("--resolution=FULL");
argsList.add(
"--output-dir=" + Paths.get(testFolder.getRoot().getPath(), directory.getName()));
String[] args = argsList.toArray(new String[argsList.size()]);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,24 +115,24 @@
"file": "FatalTmcf.tmcf",
"lineNumber": "19"
},
"userMessage": "No definition found for a referenced 'E:' value :: reference: 'E:SVTest->E13', property: 'dcid' node: 'E:SVTest->E3'",
"userMessage": "No definition found for a referenced 'E:' value :: reference: 'E:SVTest->E10', property: 'dcid' node: 'E:SVTest->E3'",
"counterKey": "Sanity_TmcfMissingEntityDef"
}, {
"level": "LEVEL_ERROR",
"location": {
"file": "FatalTmcf.tmcf",
"lineNumber": "19"
},
"userMessage": "No definition found for a referenced 'E:' value :: reference: 'E:SVTest->E10', property: 'dcid' node: 'E:SVTest->E3'",
"counterKey": "Sanity_TmcfMissingEntityDef"
"userMessage": "Column referred to in TMCF is missing from CSV header :: column: 'dcid1', node: 'E:SVTest->E3'",
"counterKey": "Sanity_TmcfMissingColumn"
}, {
"level": "LEVEL_ERROR",
"location": {
"file": "FatalTmcf.tmcf",
"lineNumber": "19"
},
"userMessage": "Column referred to in TMCF is missing from CSV header :: column: 'dcid1', node: 'E:SVTest->E3'",
"counterKey": "Sanity_TmcfMissingColumn"
"userMessage": "No definition found for a referenced 'E:' value :: reference: 'E:SVTest->E13', property: 'dcid' node: 'E:SVTest->E3'",
"counterKey": "Sanity_TmcfMissingEntityDef"
}, {
"level": "LEVEL_ERROR",
"location": {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
"LEVEL_ERROR": {
"counters": {
"Sanity_MissingOrEmpty_statType": "1",
"Sanity_UnknownStatType": "1",
"Sanity_MissingOrEmpty_dcid": "1",
"Resolution_OrphanLocalReference_parent": "1",
"Resolution_DcidAssignmentFailure_Song": "1",
Expand All @@ -37,14 +36,6 @@
},
"userMessage": "Found a missing or empty property value :: property: 'statType', node: 'SVId', type: 'StatisticalVariable'",
"counterKey": "Sanity_MissingOrEmpty_statType"
}, {
"level": "LEVEL_ERROR",
"location": {
"file": "misc.mcf",
"lineNumber": "15"
},
"userMessage": "Found an unknown statType value :: value: '', node: 'SVId'",
"counterKey": "Sanity_UnknownStatType"
}, {
"level": "LEVEL_ERROR",
"location": {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
"Sanity_MissingOrEmpty_populationType": "2",
"Sanity_MissingOrEmpty_measuredProperty": "1",
"Sanity_MissingOrEmpty_statType": "1",
"Sanity_UnknownStatType": "1",
"Sanity_NotInitLowerPropName": "1",
"Sanity_InvalidChars_domainIncludes": "1",
"Sanity_UnexpectedPropInProperty": "1",
Expand Down Expand Up @@ -261,14 +260,6 @@
},
"userMessage": "Found a missing or empty property value :: property: 'statType', node: 'dcid:Count_Death_10To12', type: 'StatisticalVariable'",
"counterKey": "Sanity_MissingOrEmpty_statType"
}, {
"level": "LEVEL_ERROR",
"location": {
"file": "AllFileTypes.mcf",
"lineNumber": "77"
},
"userMessage": "Found an unknown statType value :: value: '', node: 'dcid:Count_Death_10To12'",
"counterKey": "Sanity_UnknownStatType"
}, {
"level": "LEVEL_ERROR",
"location": {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
"Sanity_MissingOrEmpty_populationType": "2",
"Sanity_MissingOrEmpty_measuredProperty": "1",
"Sanity_MissingOrEmpty_statType": "1",
"Sanity_UnknownStatType": "1",
"Sanity_NotInitLowerPropName": "1",
"Sanity_InvalidChars_domainIncludes": "1",
"Sanity_UnexpectedPropInProperty": "1",
Expand Down Expand Up @@ -246,14 +245,6 @@
},
"userMessage": "Found a missing or empty property value :: property: 'statType', node: 'dcid:Count_Death_10To12', type: 'StatisticalVariable'",
"counterKey": "Sanity_MissingOrEmpty_statType"
}, {
"level": "LEVEL_ERROR",
"location": {
"file": "McfOnly.mcf",
"lineNumber": "77"
},
"userMessage": "Found an unknown statType value :: value: '', node: 'dcid:Count_Death_10To12'",
"counterKey": "Sanity_UnknownStatType"
}, {
"level": "LEVEL_ERROR",
"location": {
Expand Down
Loading

0 comments on commit f384cbc

Please sign in to comment.