From 9e23b57e8f233d139d0bc61385377323d6e5831d Mon Sep 17 00:00:00 2001 From: Kathy Tran Date: Mon, 11 Dec 2023 14:35:58 -0500 Subject: [PATCH] Ingest broad metrics using apache csv --- THIRD-PARTY-LICENSES.txt | 18 +- metricsaggregator/pom.xml | 14 + .../client/cli/CommandLineArgs.java | 43 ++- .../client/cli/MetricsAggregatorClient.java | 54 ++- .../client/cli/TerraMetricsSubmitter.java | 326 ++++++++++++++++++ .../src/main/resources/logback.xml | 2 +- .../client/cli/TerraMetricsSubmitterTest.java | 40 +++ pom.xml | 11 +- topicgenerator/pom.xml | 1 - utils/pom.xml | 5 + .../utils/DockstoreApiClientUtils.java | 16 + 11 files changed, 496 insertions(+), 34 deletions(-) create mode 100644 metricsaggregator/src/main/java/io/dockstore/metricsaggregator/client/cli/TerraMetricsSubmitter.java create mode 100644 metricsaggregator/src/test/java/io/dockstore/metricsaggregator/client/cli/TerraMetricsSubmitterTest.java create mode 100644 utils/src/main/java/io/dockstore/utils/DockstoreApiClientUtils.java diff --git a/THIRD-PARTY-LICENSES.txt b/THIRD-PARTY-LICENSES.txt index eefa08d2..32349b1a 100644 --- a/THIRD-PARTY-LICENSES.txt +++ b/THIRD-PARTY-LICENSES.txt @@ -131,10 +131,10 @@ Lists of 417 third-party dependencies. (The Apache Software License, Version 2.0) docker-java-core (com.github.docker-java:docker-java-core:3.3.0 - https://github.com/docker-java/docker-java) (The Apache Software License, Version 2.0) docker-java-transport (com.github.docker-java:docker-java-transport:3.3.0 - https://github.com/docker-java/docker-java) (The Apache Software License, Version 2.0) docker-java-transport-httpclient5 (com.github.docker-java:docker-java-transport-httpclient5:3.3.0 - https://github.com/docker-java/docker-java) - (Apache Software License, Version 2.0) dockstore-common (io.dockstore:dockstore-common:1.15.0-alpha.13 - no url defined) - (Apache Software License, Version 2.0) dockstore-integration-testing (io.dockstore:dockstore-integration-testing:1.15.0-alpha.13 - no url defined) - (Apache Software License, Version 2.0) dockstore-language-plugin-parent (io.dockstore:dockstore-language-plugin-parent:1.15.0-alpha.13 - no url defined) - (Apache Software License, Version 2.0) dockstore-webservice (io.dockstore:dockstore-webservice:1.15.0-alpha.13 - no url defined) + (Apache Software License, Version 2.0) dockstore-common (io.dockstore:dockstore-common:1.15.0-SNAPSHOT - no url defined) + (Apache Software License, Version 2.0) dockstore-integration-testing (io.dockstore:dockstore-integration-testing:1.15.0-SNAPSHOT - no url defined) + (Apache Software License, Version 2.0) dockstore-language-plugin-parent (io.dockstore:dockstore-language-plugin-parent:1.15.0-SNAPSHOT - no url defined) + (Apache Software License, Version 2.0) dockstore-webservice (io.dockstore:dockstore-webservice:1.15.0-SNAPSHOT - no url defined) (Apache License 2.0) Dropwizard (io.dropwizard:dropwizard-core:4.0.2 - http://www.dropwizard.io/4.0.2/dropwizard-bom/dropwizard-dependencies/dropwizard-parent/dropwizard-core) (Apache License 2.0) Dropwizard Asset Bundle (io.dropwizard:dropwizard-assets:4.0.2 - http://www.dropwizard.io/4.0.2/dropwizard-bom/dropwizard-dependencies/dropwizard-parent/dropwizard-assets) (Apache License 2.0) Dropwizard Authentication (io.dropwizard:dropwizard-auth:4.0.2 - http://www.dropwizard.io/4.0.2/dropwizard-bom/dropwizard-dependencies/dropwizard-parent/dropwizard-auth) @@ -309,9 +309,9 @@ Lists of 417 third-party dependencies. (MIT License) liquibase-slf4j (com.mattbertolini:liquibase-slf4j:5.0.0 - https://github.com/mattbertolini/liquibase-slf4j) (Apache License 2.0) localstack-utils (cloud.localstack:localstack-utils:0.2.22 - http://localstack.cloud) (Apache Software Licenses) Log4j Implemented Over SLF4J (org.slf4j:log4j-over-slf4j:2.0.9 - http://www.slf4j.org) - (Eclipse Public License - v 1.0) (GNU Lesser General Public License) Logback Access Module (ch.qos.logback:logback-access:1.4.11 - http://logback.qos.ch/logback-access) - (Eclipse Public License - v 1.0) (GNU Lesser General Public License) Logback Classic Module (ch.qos.logback:logback-classic:1.4.11 - http://logback.qos.ch/logback-classic) - (Eclipse Public License - v 1.0) (GNU Lesser General Public License) Logback Core Module (ch.qos.logback:logback-core:1.4.11 - http://logback.qos.ch/logback-core) + (Eclipse Public License - v 1.0) (GNU Lesser General Public License) Logback Access Module (ch.qos.logback:logback-access:1.4.12 - http://logback.qos.ch/logback-access) + (Eclipse Public License - v 1.0) (GNU Lesser General Public License) Logback Classic Module (ch.qos.logback:logback-classic:1.4.12 - http://logback.qos.ch/logback-classic) + (Eclipse Public License - v 1.0) (GNU Lesser General Public License) Logback Core Module (ch.qos.logback:logback-core:1.4.12 - http://logback.qos.ch/logback-core) (Apache License, Version 2.0) (MIT License) Logstash Logback Encoder (net.logstash.logback:logstash-logback-encoder:4.11 - https://github.com/logstash/logstash-logback-encoder) (Apache License, Version 2.0) Lucene Core (org.apache.lucene:lucene-core:8.7.0 - https://lucene.apache.org/lucene-parent/lucene-core) (MIT) mbknor-jackson-jsonSchema (com.kjetland:mbknor-jackson-jsonschema_2.12:1.0.34 - https://github.com/mbknor/mbknor-jackson-jsonSchema) @@ -354,7 +354,7 @@ Lists of 417 third-party dependencies. (Apache License, Version 2.0) Objenesis (org.objenesis:objenesis:3.2 - http://objenesis.org/objenesis) (The Apache Software License, Version 2.0) okhttp (com.squareup.okhttp3:okhttp:4.10.0 - https://square.github.io/okhttp/) (The Apache Software License, Version 2.0) okio (com.squareup.okio:okio-jvm:3.0.0 - https://github.com/square/okio/) - (Apache Software License, Version 2.0) openapi-java-client (io.dockstore:openapi-java-client:1.15.0-alpha.13 - no url defined) + (Apache Software License, Version 2.0) openapi-java-client (io.dockstore:openapi-java-client:1.15.0-SNAPSHOT - no url defined) (The Apache License, Version 2.0) OpenCensus (io.opencensus:opencensus-api:0.31.0 - https://github.com/census-instrumentation/opencensus-java) (Apache 2) opencsv (com.opencsv:opencsv:5.7.1 - http://opencsv.sf.net) (Apache 2.0) optics (io.circe:circe-optics_2.13:0.14.1 - https://github.com/circe/circe-optics) @@ -395,7 +395,7 @@ Lists of 417 third-party dependencies. (Apache License 2.0) swagger-core-jakarta (io.swagger.core.v3:swagger-core-jakarta:2.2.15 - https://github.com/swagger-api/swagger-core/modules/swagger-core-jakarta) (Apache License 2.0) swagger-integration-jakarta (io.swagger.core.v3:swagger-integration-jakarta:2.2.15 - https://github.com/swagger-api/swagger-core/modules/swagger-integration-jakarta) (Apache Software License, Version 2.0) swagger-java-bitbucket-client (io.dockstore:swagger-java-bitbucket-client:2.0.3 - no url defined) - (Apache Software License, Version 2.0) swagger-java-client (io.dockstore:swagger-java-client:1.15.0-alpha.13 - no url defined) + (Apache Software License, Version 2.0) swagger-java-client (io.dockstore:swagger-java-client:1.15.0-SNAPSHOT - no url defined) (Apache Software License, Version 2.0) swagger-java-discourse-client (io.dockstore:swagger-java-discourse-client:2.0.1 - no url defined) (Apache Software License, Version 2.0) swagger-java-quay-client (io.dockstore:swagger-java-quay-client:2.0.2 - no url defined) (Apache Software License, Version 2.0) swagger-java-sam-client (io.dockstore:swagger-java-sam-client:2.0.2 - no url defined) diff --git a/metricsaggregator/pom.xml b/metricsaggregator/pom.xml index 7a525b01..362f0e83 100644 --- a/metricsaggregator/pom.xml +++ b/metricsaggregator/pom.xml @@ -132,6 +132,14 @@ org.slf4j slf4j-api + + ch.qos.logback + logback-core + + + ch.qos.logback + logback-classic + software.amazon.awssdk s3 @@ -144,6 +152,10 @@ javax.money money-api + + org.apache.commons + commons-csv + org.junit.jupiter junit-jupiter-api @@ -278,6 +290,8 @@ org.glassfish.jersey.inject:jersey-hk2 javax.money:money-api org.javamoney.moneta:moneta-core + ch.qos.logback:logback-classic + ch.qos.logback:logback-core diff --git a/metricsaggregator/src/main/java/io/dockstore/metricsaggregator/client/cli/CommandLineArgs.java b/metricsaggregator/src/main/java/io/dockstore/metricsaggregator/client/cli/CommandLineArgs.java index f84c0bd7..9b78c444 100644 --- a/metricsaggregator/src/main/java/io/dockstore/metricsaggregator/client/cli/CommandLineArgs.java +++ b/metricsaggregator/src/main/java/io/dockstore/metricsaggregator/client/cli/CommandLineArgs.java @@ -42,7 +42,7 @@ public File getConfig() { } @Parameters(commandNames = { "submit-validation-data" }, commandDescription = "Formats workflow validation data specified in a file then submits it to Dockstore") - public static class SubmitValidationData extends CommandLineArgs { + public static class SubmitValidationData extends CommandLineArgs { @Parameter(names = {"-c", "--config"}, description = "The config file path.") private File config = new File("./" + MetricsAggregatorClient.CONFIG_FILE_NAME); @@ -78,4 +78,45 @@ public Partner getPlatform() { return platform; } } + + @Parameters(commandNames = { "submit-terra-metrics" }, commandDescription = "Formats workflow validation data specified in a file then submits it to Dockstore") + public static class SubmitTerraMetrics extends CommandLineArgs { + @Parameter(names = {"-c", "--config"}, description = "The config file path.") + private File config = new File("./" + MetricsAggregatorClient.CONFIG_FILE_NAME); + + + @Parameter(names = {"-d", "--data"}, description = "The file path to the CSV file containing workflow metrics from Terra. The first line of the file should contain the CSV fields: workflow_id,status,workflow_start,workflow_end,workflow_runtime_minutes,source_url", required = true) + private String dataFilePath; + + @Parameter(names = {"-r", "--recordSkipped"}, description = "Record skipped executions and the reason skipped to a CSV file") + private boolean recordSkippedExecutions; + + public File getConfig() { + return config; + } + + + public String getDataFilePath() { + return dataFilePath; + } + + public boolean isRecordSkippedExecutions() { + return recordSkippedExecutions; + } + + /** + * Headers for the input data file + */ + public enum TerraMetricsCsvHeaders { + workflow_id, status, workflow_start, workflow_end, workflow_runtime_minutes, source_url + } + + /** + * Headers for the output file containing workflow executions that were skipped. + * The headers are the same as the input file headers, with the addition of a "reason" header indicating why an execution was skipped + */ + public enum SkippedTerraMetricsCsvHeaders { + workflow_id, status, workflow_start, workflow_end, workflow_runtime_minutes, source_url, reason_skipped + } + } } diff --git a/metricsaggregator/src/main/java/io/dockstore/metricsaggregator/client/cli/MetricsAggregatorClient.java b/metricsaggregator/src/main/java/io/dockstore/metricsaggregator/client/cli/MetricsAggregatorClient.java index 49036119..6c7e4c06 100644 --- a/metricsaggregator/src/main/java/io/dockstore/metricsaggregator/client/cli/MetricsAggregatorClient.java +++ b/metricsaggregator/src/main/java/io/dockstore/metricsaggregator/client/cli/MetricsAggregatorClient.java @@ -19,6 +19,7 @@ import static io.dockstore.utils.CLIConstants.FAILURE_EXIT_CODE; import static io.dockstore.utils.ConfigFileUtils.getConfiguration; +import static io.dockstore.utils.DockstoreApiClientUtils.setupApiClient; import com.beust.jcommander.JCommander; import com.beust.jcommander.MissingCommandException; @@ -27,9 +28,9 @@ import io.dockstore.metricsaggregator.MetricsAggregatorConfig; import io.dockstore.metricsaggregator.MetricsAggregatorS3Client; import io.dockstore.metricsaggregator.client.cli.CommandLineArgs.AggregateMetricsCommand; +import io.dockstore.metricsaggregator.client.cli.CommandLineArgs.SubmitTerraMetrics; import io.dockstore.metricsaggregator.client.cli.CommandLineArgs.SubmitValidationData; import io.dockstore.openapi.client.ApiClient; -import io.dockstore.openapi.client.Configuration; import io.dockstore.openapi.client.api.ExtendedGa4GhApi; import io.dockstore.openapi.client.model.ExecutionsRequestBody; import io.dockstore.openapi.client.model.ValidationExecution; @@ -38,6 +39,8 @@ import java.io.IOException; import java.net.URISyntaxException; import java.nio.file.Files; +import java.time.Duration; +import java.time.Instant; import java.util.List; import java.util.Optional; import org.apache.commons.configuration2.INIConfiguration; @@ -56,19 +59,22 @@ public class MetricsAggregatorClient { private static final Logger LOG = LoggerFactory.getLogger(MetricsAggregatorClient.class); - public MetricsAggregatorClient() { } public static void main(String[] args) { + final Instant startTime = Instant.now(); MetricsAggregatorClient metricsAggregatorClient = new MetricsAggregatorClient(); final CommandLineArgs commandLineArgs = new CommandLineArgs(); final JCommander jCommander = new JCommander(commandLineArgs); final AggregateMetricsCommand aggregateMetricsCommand = new AggregateMetricsCommand(); final SubmitValidationData submitValidationData = new SubmitValidationData(); + final SubmitTerraMetrics submitTerraMetrics = new SubmitTerraMetrics(); + jCommander.addCommand(aggregateMetricsCommand); jCommander.addCommand(submitValidationData); + jCommander.addCommand(submitTerraMetrics); try { jCommander.parse(args); @@ -117,20 +123,35 @@ public static void main(String[] args) { try { final MetricsAggregatorConfig metricsAggregatorConfig = new MetricsAggregatorConfig(config.get()); metricsAggregatorClient.submitValidationData(metricsAggregatorConfig, submitValidationData.getValidator(), - submitValidationData.getValidatorVersion(), submitValidationData.getDataFilePath(), submitValidationData.getPlatform()); + submitValidationData.getValidatorVersion(), submitValidationData.getDataFilePath(), + submitValidationData.getPlatform()); } catch (Exception e) { LOG.error("Could not submit validation metrics to Dockstore", e); System.exit(FAILURE_EXIT_CODE); } } - } - } + } else if ("submit-terra-metrics".equals(jCommander.getParsedCommand())) { + if (submitTerraMetrics.isHelp()) { + jCommander.usage(); + } else { + final Optional config = getConfiguration(submitTerraMetrics.getConfig()); + if (config.isEmpty()) { + System.exit(FAILURE_EXIT_CODE); + } - private ApiClient setupApiClient(String serverUrl, String token) { - ApiClient apiClient = Configuration.getDefaultApiClient(); - apiClient.setBasePath(serverUrl); - apiClient.addDefaultHeader("Authorization", "Bearer " + token); - return apiClient; + try { + final MetricsAggregatorConfig metricsAggregatorConfig = new MetricsAggregatorConfig(config.get()); + final TerraMetricsSubmitter submitTerraMetricsCommand = new TerraMetricsSubmitter(metricsAggregatorConfig, + submitTerraMetrics); + submitTerraMetricsCommand.submitMetrics(); + } catch (Exception e) { + LOG.error("Could not submit terra metrics to Dockstore", e); + System.exit(FAILURE_EXIT_CODE); + } + } + } + final Instant endTime = Instant.now(); + LOG.info("{} took {}", jCommander.getParsedCommand(), Duration.between(startTime, endTime)); } private void aggregateMetrics(MetricsAggregatorConfig config) throws URISyntaxException { @@ -147,8 +168,8 @@ private void aggregateMetrics(MetricsAggregatorConfig config) throws URISyntaxEx metricsAggregatorS3Client.aggregateMetrics(extendedGa4GhApi); } - - private void submitValidationData(MetricsAggregatorConfig config, ValidatorToolEnum validator, String validatorVersion, String dataFilePath, Partner platform) throws IOException { + private void submitValidationData(MetricsAggregatorConfig config, ValidatorToolEnum validator, String validatorVersion, + String dataFilePath, Partner platform) throws IOException { ApiClient apiClient = setupApiClient(config.getDockstoreServerUrl(), config.getDockstoreToken()); ExtendedGa4GhApi extendedGa4GhApi = new ExtendedGa4GhApi(apiClient); @@ -179,17 +200,16 @@ private void submitValidationData(MetricsAggregatorConfig config, ValidatorToolE continue; } String dateExecuted = lineComponents[DATE_EXECUTED_INDEX]; - ValidationExecution validationExecution = new ValidationExecution() - .validatorTool(validator) - .validatorToolVersion(validatorVersion) - .isValid(isValid); + ValidationExecution validationExecution = new ValidationExecution().validatorTool(validator) + .validatorToolVersion(validatorVersion).isValid(isValid); validationExecution.setDateExecuted(dateExecuted); ExecutionsRequestBody executionsRequestBody = new ExecutionsRequestBody().validationExecutions(List.of(validationExecution)); try { extendedGa4GhApi.executionMetricsPost(executionsRequestBody, platform.toString(), trsId, versionName, "Validation executions submitted using dockstore-support metricsaggregator"); - System.out.printf("Submitted validation metrics for tool ID %s, version %s, %s validated by %s %s on platform %s%n", trsId, versionName, isValid ? "successfully" : "unsuccessfully", validator, validatorVersion, platform); + System.out.printf("Submitted validation metrics for tool ID %s, version %s, %s validated by %s %s on platform %s%n", trsId, + versionName, isValid ? "successfully" : "unsuccessfully", validator, validatorVersion, platform); } catch (Exception e) { // Could end up here if the workflow no longer exists. Log then continue processing LOG.error("Could not submit validation executions to Dockstore for workflow {}", csvLine, e); diff --git a/metricsaggregator/src/main/java/io/dockstore/metricsaggregator/client/cli/TerraMetricsSubmitter.java b/metricsaggregator/src/main/java/io/dockstore/metricsaggregator/client/cli/TerraMetricsSubmitter.java new file mode 100644 index 00000000..53012496 --- /dev/null +++ b/metricsaggregator/src/main/java/io/dockstore/metricsaggregator/client/cli/TerraMetricsSubmitter.java @@ -0,0 +1,326 @@ +package io.dockstore.metricsaggregator.client.cli; + +import static io.dockstore.utils.CLIConstants.FAILURE_EXIT_CODE; +import static io.dockstore.utils.DockstoreApiClientUtils.setupApiClient; + +import io.dockstore.common.Partner; +import io.dockstore.metricsaggregator.MetricsAggregatorConfig; +import io.dockstore.metricsaggregator.client.cli.CommandLineArgs.SubmitTerraMetrics; +import io.dockstore.metricsaggregator.client.cli.CommandLineArgs.SubmitTerraMetrics.SkippedTerraMetricsCsvHeaders; +import io.dockstore.metricsaggregator.client.cli.CommandLineArgs.SubmitTerraMetrics.TerraMetricsCsvHeaders; +import io.dockstore.openapi.client.ApiClient; +import io.dockstore.openapi.client.ApiException; +import io.dockstore.openapi.client.api.ExtendedGa4GhApi; +import io.dockstore.openapi.client.api.WorkflowsApi; +import io.dockstore.openapi.client.model.ExecutionsRequestBody; +import io.dockstore.openapi.client.model.RunExecution; +import io.dockstore.openapi.client.model.RunExecution.ExecutionStatusEnum; +import io.dockstore.openapi.client.model.Workflow.DescriptorTypeEnum; +import java.io.BufferedReader; +import java.io.FileReader; +import java.io.FileWriter; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.time.Instant; +import java.time.LocalDateTime; +import java.time.ZoneOffset; +import java.time.format.DateTimeFormatter; +import java.time.format.DateTimeFormatterBuilder; +import java.time.format.DateTimeParseException; +import java.time.temporal.ChronoField; +import java.time.temporal.ChronoUnit; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.stream.Collectors; +import org.apache.commons.csv.CSVFormat; +import org.apache.commons.csv.CSVPrinter; +import org.apache.commons.csv.CSVRecord; +import org.apache.commons.lang3.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class TerraMetricsSubmitter { + private static final Logger LOG = LoggerFactory.getLogger(MetricsAggregatorClient.class); + private final MetricsAggregatorConfig config; + private final SubmitTerraMetrics submitTerraMetricsCommand; + private int numberOfExecutionsSubmitted = 0; + private int numberOfExecutionsSkipped = 0; + private int numberOfCacheHits = 0; + private int numberOfCacheMisses = 0; + // Keep track of sourceUrls that are found to be ambiguous + private Map sourceUrlsToSkipToReason = new HashMap<>(); + // Map of source url to TRS info that was previously calculated + private Map sourceUrlToSourceUrlTrsInfo = new HashMap<>(); + private Map> workflowPathPrefixToWorkflows = new HashMap<>(); + + + public TerraMetricsSubmitter(MetricsAggregatorConfig config, SubmitTerraMetrics submitTerraMetricsCommand) { + this.config = config; + this.submitTerraMetricsCommand = submitTerraMetricsCommand; + } + + public void submitMetrics() { + ApiClient apiClient = setupApiClient(config.getDockstoreServerUrl(), config.getDockstoreToken()); + ExtendedGa4GhApi extendedGa4GhApi = new ExtendedGa4GhApi(apiClient); + WorkflowsApi workflowsApi = new WorkflowsApi(apiClient); + + // Read CSV file + Iterable workflowMetricRecords = null; + final String inputDateFilePath = this.submitTerraMetricsCommand.getDataFilePath(); + try (BufferedReader metricsBufferedReader = new BufferedReader(new FileReader(inputDateFilePath))) { + //final Reader entriesCsv = new BufferedReader(new FileReader(inputDateFilePath)); + final CSVFormat csvFormat = CSVFormat.DEFAULT.builder() + .setHeader(TerraMetricsCsvHeaders.class) + .setSkipHeaderRecord(true) + .setTrim(true) + .build(); + workflowMetricRecords = csvFormat.parse(metricsBufferedReader); + + final String outputFileName = inputDateFilePath + "_skipped_executions_" + Instant.now().truncatedTo(ChronoUnit.SECONDS).toString().replace("-", "").replace(":", "") + ".csv"; + + try (CSVPrinter skippedExecutionsCsvPrinter = submitTerraMetricsCommand.isRecordSkippedExecutions() ? new CSVPrinter(new FileWriter(outputFileName, StandardCharsets.UTF_8), CSVFormat.DEFAULT.builder().setHeader(SkippedTerraMetricsCsvHeaders.class).build()) : null) { + for (CSVRecord workflowMetricRecord: workflowMetricRecords) { + final String sourceUrl = workflowMetricRecord.get(TerraMetricsCsvHeaders.source_url); + LOG.info("Processing execution on row {} with source_url {}", workflowMetricRecord.getRecordNumber(), sourceUrl); + + // Check to see if this source_url was skipped before + if (sourceUrlsToSkipToReason.containsKey(sourceUrl)) { + skipExecution(sourceUrl, workflowMetricRecord, sourceUrlsToSkipToReason.get(sourceUrl), skippedExecutionsCsvPrinter); + continue; + } + + // Check to see if we need to figure out the TRS ID for the source URL + if (sourceUrlToSourceUrlTrsInfo.containsKey(sourceUrl)) { + ++numberOfCacheHits; + } else { + ++numberOfCacheMisses; + Optional sourceUrlTrsInfo = calculateTrsInfoFromSourceUrl(workflowMetricRecord, sourceUrl, workflowsApi, skippedExecutionsCsvPrinter); + if (sourceUrlTrsInfo.isEmpty()) { + continue; + } else { + sourceUrlToSourceUrlTrsInfo.put(sourceUrl, sourceUrlTrsInfo.get()); + } + } + + final Optional workflowExecution = getTerraWorkflowExecutionFromCsvRecord(workflowMetricRecord, sourceUrl, skippedExecutionsCsvPrinter); + if (workflowExecution.isEmpty()) { + continue; + } + + final SourceUrlTrsInfo sourceUrlTrsInfo = sourceUrlToSourceUrlTrsInfo.get(sourceUrl); + final ExecutionsRequestBody executionsRequestBody = new ExecutionsRequestBody().runExecutions(List.of(workflowExecution.get())); + try { + extendedGa4GhApi.executionMetricsPost(executionsRequestBody, Partner.TERRA.toString(), sourceUrlTrsInfo.trsId(), sourceUrlTrsInfo.version(), + "Terra metrics from BigQuery table broad-dsde-prod-analytics-dev.externally_shared_metrics.dockstore_workflow_metrics_Q4_2023"); + ++numberOfExecutionsSubmitted; + } catch (ApiException e) { + skipExecution(sourceUrl, workflowMetricRecord, String.format("Could not submit execution metrics to Dockstore for workflow %s: %s", sourceUrlTrsInfo, e.getMessage()), skippedExecutionsCsvPrinter); + } + } + } catch (IOException e) { + LOG.error("Unable to create new CSV output file", e); + System.exit(FAILURE_EXIT_CODE); + } + + LOG.info("Done submitting executions from Terra. Submitted {} executions. Skipped {} executions. Cache hits: {}. Cache misses: {}", numberOfExecutionsSubmitted, numberOfExecutionsSkipped, numberOfCacheHits, numberOfCacheMisses); + if (submitTerraMetricsCommand.isRecordSkippedExecutions()) { + LOG.info("View skipped executions in file {}", outputFileName); + } + } catch (IOException e) { + LOG.error("Unable to read input CSV file", e); + System.exit(FAILURE_EXIT_CODE); + } + } + + static Optional getExecutionStatusEnumFromTerraStatus(String terraStatus) { + ExecutionStatusEnum executionStatusEnum = switch (terraStatus) { + case "Succeeded" -> ExecutionStatusEnum.SUCCESSFUL; + case "Failed" -> ExecutionStatusEnum.FAILED; + case "Aborted" -> ExecutionStatusEnum.ABORTED; + default -> null; + }; + return Optional.ofNullable(executionStatusEnum); + } + + static Optional formatStringInIso8601Date(String workflowStart) { + // Example workflow_start: 2022-07-15 15:37:06.450000 UTC + final int maxNumberOfMicroSeconds = 9; + final DateTimeFormatter workflowStartFormat = new DateTimeFormatterBuilder() + .appendPattern("yyyy-MM-dd HH:mm:ss") + .appendFraction(ChronoField.MICRO_OF_SECOND, 0, maxNumberOfMicroSeconds, true) // There are varying widths of micro seconds + .optionalStart() // Start optional time zone pattern. Time zone is not always included + .appendPattern(" ") + .appendZoneId() + .optionalEnd() // End optional time zone pattern + .toFormatter(); + + try { + final LocalDateTime localDateTime = LocalDateTime.parse(workflowStart, workflowStartFormat); + return Optional.of(DateTimeFormatter.ISO_INSTANT.format(localDateTime.atOffset(ZoneOffset.UTC))); + } catch (DateTimeParseException e) { + return Optional.empty(); + } + } + + private Optional getPrimaryDescriptorAbsolutePath(WorkflowsApi workflowsApi, MinimalWorkflowInfo workflow, String version) { + try { + return Optional.of(workflowsApi.primaryDescriptor1(workflow.id(), version, workflow.descriptorType().toString()).getAbsolutePath()); + } catch (ApiException e) { + return Optional.empty(); + } + } + + + private List getSourceUrlComponents(String sourceUrl) { + final String rawGitHubUserContentUrlPrefix = "https://raw.githubusercontent.com/"; // The source_url starts with this + final String sourceUrlWithoutGitHubPrefix = sourceUrl.replace(rawGitHubUserContentUrlPrefix, ""); + return Arrays.stream(sourceUrlWithoutGitHubPrefix.split("/")) + .filter(urlComponent -> !urlComponent.isEmpty()) // Filter out empty strings that are a result of consecutive slashes '//' + .toList(); + } + + private void skipExecution(String sourceUrl, CSVRecord csvRecordToSkip, String reason, CSVPrinter skippedExecutionsCsvPrinter) { + LOG.info("Skipping execution on row {}: {}", csvRecordToSkip.getRecordNumber(), reason); + // Record to map for future reference + sourceUrlsToSkipToReason.put(sourceUrl, reason); + if (submitTerraMetricsCommand.isRecordSkippedExecutions()) { + // Record to output CSV file for later examination + // Headers: workflow_id, status, workflow_start, workflow_end, workflow_runtime_minutes, source_url + List csvColumnValues = Arrays.stream(TerraMetricsCsvHeaders.values()) + .map(csvRecordToSkip::get) // Get the column value for the record + .collect(Collectors.toList()); + csvColumnValues.add(reason); + try { + skippedExecutionsCsvPrinter.printRecord(csvColumnValues); + } catch (IOException e) { + LOG.error("Could not write skipped execution to output file"); + } + } + ++numberOfExecutionsSkipped; + } + + public Optional getTerraWorkflowExecutionFromCsvRecord(CSVRecord csvRecord, String sourceUrl, CSVPrinter skippedExecutionsCsvPrinter) { + final String executionId = csvRecord.get(TerraMetricsCsvHeaders.workflow_id); + final String workflowStart = csvRecord.get(TerraMetricsCsvHeaders.workflow_start); + final String status = csvRecord.get(TerraMetricsCsvHeaders.status); + final String workflowRunTimeMinutes = csvRecord.get(TerraMetricsCsvHeaders.workflow_runtime_minutes); + + // Check that all required fields are non-null + if (executionId == null || workflowStart == null || status == null) { + skipExecution(sourceUrl, csvRecord, "One or more of the required fields (workflow_id, workflow_start, status) is missing", skippedExecutionsCsvPrinter); + return Optional.empty(); + } + + // Format fields into Dockstore schema + final Optional executionStatus = getExecutionStatusEnumFromTerraStatus(status); + if (executionStatus.isEmpty()) { + skipExecution(sourceUrl, csvRecord, "Could not get a valid ExecutionStatusEnum from status '" + status + "'", skippedExecutionsCsvPrinter); + return Optional.empty(); + } + + final Optional dateExecuted = formatStringInIso8601Date(workflowStart); + if (dateExecuted.isEmpty()) { + skipExecution(sourceUrl, csvRecord, "Could not get a valid dateExecuted from workflow_start '" + workflowStart + "'", skippedExecutionsCsvPrinter); + return Optional.empty(); + } + + RunExecution workflowExecution = new RunExecution(); + // TODO: uncomment below when the update executions endpoint PR is merged + // workflowExecution.setExecutionId(executionId); + workflowExecution.setExecutionStatus(executionStatus.get()); + workflowExecution.setDateExecuted(dateExecuted.get()); + getExecutionTime(workflowRunTimeMinutes).ifPresent(workflowExecution::setExecutionTime); + return Optional.of(workflowExecution); + } + + static Optional getExecutionTime(String workflowRunTimeMinutes) { + if (StringUtils.isBlank(workflowRunTimeMinutes)) { + return Optional.empty(); + } + return Optional.of(String.format("PT%sM", workflowRunTimeMinutes)); + } + + private Optional calculateTrsInfoFromSourceUrl(CSVRecord workflowMetricRecord, String sourceUrl, WorkflowsApi workflowsApi, CSVPrinter skippedExecutionsCsvPrinter) { + // Need to figure out the TRS ID and version name using the source_url. + // Example source_url: https://raw.githubusercontent.com/theiagen/public_health_viral_genomics/v2.0.0/workflows/wf_theiacov_fasta.wdl + // Organization = "theiagen/public_health_viral_genomics", version = "v2.0.0", the rest is the primary descriptor path + // Note that the TRS ID may also have a workflow name, which we need to figure out + final List sourceUrlComponents = getSourceUrlComponents(sourceUrl); + + // There should be at least three elements in order for there to be an organization name, foo>/, and version + // in /// + final int minNumberOfComponents = 3; + if (sourceUrlComponents.size() >= minNumberOfComponents) { + final String organization = sourceUrlComponents.get(0) + "/" + sourceUrlComponents.get(1); + final String version = sourceUrlComponents.get(2); + final String primaryDescriptorPathFromUrl = "/" + String.join("/", sourceUrlComponents.subList(3, sourceUrlComponents.size())); + + final String workflowPathPrefix = "github.com/" + organization; + if (!workflowPathPrefixToWorkflows.containsKey(workflowPathPrefix)) { + try { + List publishedWorkflowsWithSamePathPrefix = workflowsApi.getAllPublishedWorkflowByPath(workflowPathPrefix).stream() + .map(workflow -> new MinimalWorkflowInfo(workflow.getId(), workflow.getFullWorkflowPath(), workflow.getDescriptorType(), new HashMap<>())) + .toList(); + workflowPathPrefixToWorkflows.put(workflowPathPrefix, publishedWorkflowsWithSamePathPrefix); + } catch (ApiException e) { + skipExecution(sourceUrl, workflowMetricRecord, + "Could not get all published workflows for workflow path " + workflowPathPrefix + " to determine TRS ID", + skippedExecutionsCsvPrinter); + return Optional.empty(); + } + } + + List workflowsFromSameRepo = workflowPathPrefixToWorkflows.get(workflowPathPrefix); + + List foundFullWorkflowPaths = new ArrayList<>(); + workflowsFromSameRepo.forEach(workflow -> { + if (!workflow.versionToPrimaryDescriptorPath().containsKey(version)) { + // Intentionally putting null in map to indicate that the primary descriptor path for the workflow version doesn't exist + workflow.versionToPrimaryDescriptorPath().put(version, getPrimaryDescriptorAbsolutePath(workflowsApi, workflow, version).orElse(null)); + } + + final String primaryDescriptorPathForVersion = workflow.versionToPrimaryDescriptorPath().get(version); + if (primaryDescriptorPathFromUrl.equals(primaryDescriptorPathForVersion)) { + // Collect a list of workflow paths that have the primary descriptor path we're looking for + foundFullWorkflowPaths.add(workflow.fullWorkflowPath()); + } + }); + + + if (foundFullWorkflowPaths.isEmpty()) { + skipExecution(sourceUrl, workflowMetricRecord, "Could not find workflow with primary descriptor " + primaryDescriptorPathFromUrl, skippedExecutionsCsvPrinter); + return Optional.empty(); + } else if (foundFullWorkflowPaths.size() > 1) { + // There is already a workflow in the same repository with the same descriptor path that we're looking for. + // Skip this source url because it is an ambiguous case and we can't identify which workflow the source url is referring to. + skipExecution(sourceUrl, workflowMetricRecord, "There's more than one workflow in the repository with the same primary descriptor path", skippedExecutionsCsvPrinter); + return Optional.empty(); + } else { + final SourceUrlTrsInfo sourceUrlTrsInfo = new SourceUrlTrsInfo(sourceUrl, "#workflow/" + foundFullWorkflowPaths.get(0), version); + return Optional.of(sourceUrlTrsInfo); + } + } else { + skipExecution(sourceUrl, workflowMetricRecord, "Not enough components in the source_url to figure out the TRS ID and version", skippedExecutionsCsvPrinter); + return Optional.empty(); + } + } + + /** + * Record that stores the calculated TRS ID and version, derived from the source_url + * @param sourceUrl + * @param trsId + * @param version + */ + public record SourceUrlTrsInfo(String sourceUrl, String trsId, String version) { + } + + public record MinimalWorkflowInfo(long id, String fullWorkflowPath, DescriptorTypeEnum descriptorType, Map versionToPrimaryDescriptorPath) { + } + + public record WorkflowsFromSameOrganizationInfo(String workflowPathPrefix, String organization, Map primaryDescriptorPathToFullWorkflowPath) { + } +} diff --git a/metricsaggregator/src/main/resources/logback.xml b/metricsaggregator/src/main/resources/logback.xml index ef430a56..3f2d6d36 100644 --- a/metricsaggregator/src/main/resources/logback.xml +++ b/metricsaggregator/src/main/resources/logback.xml @@ -23,7 +23,7 @@ - + diff --git a/metricsaggregator/src/test/java/io/dockstore/metricsaggregator/client/cli/TerraMetricsSubmitterTest.java b/metricsaggregator/src/test/java/io/dockstore/metricsaggregator/client/cli/TerraMetricsSubmitterTest.java new file mode 100644 index 00000000..8b038362 --- /dev/null +++ b/metricsaggregator/src/test/java/io/dockstore/metricsaggregator/client/cli/TerraMetricsSubmitterTest.java @@ -0,0 +1,40 @@ +package io.dockstore.metricsaggregator.client.cli; + +import static io.dockstore.metricsaggregator.client.cli.TerraMetricsSubmitter.formatStringInIso8601Date; +import static io.dockstore.metricsaggregator.client.cli.TerraMetricsSubmitter.getExecutionTime; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import io.dockstore.openapi.client.model.RunExecution.ExecutionStatusEnum; +import org.junit.jupiter.api.Test; + +class TerraMetricsSubmitterTest { + + @Test + void testFormatStringInIso8601Date() { + assertEquals("2022-07-15T15:37:06.123456Z", formatStringInIso8601Date("2022-07-15 15:37:06.123456 UTC").get()); + assertEquals("2022-07-15T15:37:06.450Z", formatStringInIso8601Date("2022-07-15 15:37:06.450000 UTC").get()); + assertEquals("2022-06-30T00:33:44.101Z", formatStringInIso8601Date("2022-06-30 00:33:44.101000 UTC").get()); + assertEquals("2022-09-06T13:46:53Z", formatStringInIso8601Date("2022-09-06 13:46:53").get()); + assertEquals("2022-05-25T13:13:08.510Z", formatStringInIso8601Date("2022-05-25 13:13:08.51 UTC").get()); + assertEquals("2022-12-01T01:52:05.700Z", formatStringInIso8601Date("2022-12-01 01:52:05.7 UTC").get()); + } + + @Test + void testGetExecutionStatusEnumFromTerraStatus() { + assertEquals(ExecutionStatusEnum.SUCCESSFUL, TerraMetricsSubmitter.getExecutionStatusEnumFromTerraStatus("Succeeded").get()); + assertEquals(ExecutionStatusEnum.FAILED, TerraMetricsSubmitter.getExecutionStatusEnumFromTerraStatus("Failed").get()); + assertEquals(ExecutionStatusEnum.ABORTED, TerraMetricsSubmitter.getExecutionStatusEnumFromTerraStatus("Aborted").get()); + // These two statuses are present in the workflow executions Terra provided, but they don't represent a completed execution + assertTrue(TerraMetricsSubmitter.getExecutionStatusEnumFromTerraStatus("Aborting").isEmpty()); + assertTrue(TerraMetricsSubmitter.getExecutionStatusEnumFromTerraStatus("Running").isEmpty()); + } + + @Test + void testGetExecutionTime() { + assertEquals("PT100M", getExecutionTime("100").get()); + assertTrue(getExecutionTime("").isEmpty()); + assertTrue(getExecutionTime(" ").isEmpty()); + assertTrue(getExecutionTime(null).isEmpty()); + } +} diff --git a/pom.xml b/pom.xml index 4538516f..5ada9db7 100644 --- a/pom.xml +++ b/pom.xml @@ -38,7 +38,7 @@ scm:git:git@github.com:dockstore/dockstore-support.git UTF-8 - 1.15.0-alpha.13 + 1.15.0-SNAPSHOT 3.0.0-M5 2.22.2 false @@ -162,6 +162,11 @@ jersey-hk2 3.0.11 + + org.apache.commons + commons-csv + 1.10.0 + @@ -239,10 +244,6 @@ true - - - **/logback.xml - diff --git a/topicgenerator/pom.xml b/topicgenerator/pom.xml index 64ee2aa7..95ab3576 100644 --- a/topicgenerator/pom.xml +++ b/topicgenerator/pom.xml @@ -129,7 +129,6 @@ org.apache.commons commons-csv - 1.10.0 diff --git a/utils/pom.xml b/utils/pom.xml index 9157a180..e6439bb5 100644 --- a/utils/pom.xml +++ b/utils/pom.xml @@ -89,6 +89,11 @@ commons-beanutils commons-beanutils + + io.dockstore + openapi-java-client + ${dockstore-core.version} + diff --git a/utils/src/main/java/io/dockstore/utils/DockstoreApiClientUtils.java b/utils/src/main/java/io/dockstore/utils/DockstoreApiClientUtils.java new file mode 100644 index 00000000..b65d1e99 --- /dev/null +++ b/utils/src/main/java/io/dockstore/utils/DockstoreApiClientUtils.java @@ -0,0 +1,16 @@ +package io.dockstore.utils; + +import io.dockstore.openapi.client.ApiClient; +import io.dockstore.openapi.client.Configuration; + +public final class DockstoreApiClientUtils { + private DockstoreApiClientUtils() { + } + + public static ApiClient setupApiClient(String serverUrl, String token) { + ApiClient apiClient = Configuration.getDefaultApiClient(); + apiClient.setBasePath(serverUrl); + apiClient.addDefaultHeader("Authorization", "Bearer " + token); + return apiClient; + } +}