Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ingest Terra metrics #478

Merged
merged 20 commits into from
Jan 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 37 additions & 3 deletions metricsaggregator/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -69,16 +69,39 @@ Usage: <main class> [options] [command] [command options]
format of the workflows that were validated by the validator
specified. The first line of the file should contain the CSV
fields: trsID,versionName,isValid,dateExecuted
-id, --executionId
The execution ID to use for each validation execution. Assumes
that each validation in the file is performed on unique workflows
and workflow versions.
--help
Prints help for metricsaggregator
* -p, --platform
The platform that the workflow was validated on
Possible Values: [GALAXY, TERRA, DNA_STACK, DNA_NEXUS, CGC, NHLBI_BIODATA_CATALYST, ANVIL, CAVATICA, NEXTFLOW_TOWER, ELWAZI, AGC, OTHER]
Possible Values: [GALAXY, TERRA, DNA_STACK, DNA_NEXUS, CGC, NHLBI_BIODATA_CATALYST, ANVIL, CAVATICA, NEXTFLOW_TOWER, ELWAZI, AGC, OTHER, ALL]
* -v, --validator
The validator tool used to validate the workflows
Possible Values: [MINIWDL, WOMTOOL, CWLTOOL, NF_VALIDATION, OTHER]
* -vv, --validatorVersion
The version of the validator tool used to validate the workflows

submit-terra-metrics Submits workflow metrics provided by Terra via a
CSV file to Dockstore
Usage: submit-terra-metrics [options]
Options:
-c, --config
The config file path.
Default: ./metrics-aggregator.config
* -d, --data
The file path to the CSV file containing workflow metrics from
Terra. The first line of the file should contain the CSV fields: workflow_id,status,workflow_start,workflow_end,workflow_runtime_minutes,source_url
-de, --description
Optional description about the metrics to include when submitting
metrics to Dockstore
--help
Prints help for metricsaggregator
-r, --recordSkipped
Record skipped executions and the reason skipped to a CSV file
Default: false
```

### aggregate-metrics
Expand All @@ -98,7 +121,18 @@ with miniwdl on DNAstack:

```
java -jar target/metricsaggregator-*-SNAPSHOT.jar submit-validation-data --config my-custom-config \
--data <path-to-my-data-file> --validator MINIWDL --validatorVersion 1.0 --platform DNA_STACK
--data <path-to-my-data-file> --validator MINIWDL --validatorVersion 1.0 --platform DNA_STACK --executionId a02075d9-092a-4fe7-9f83-4abf11de3dc9
```

After running this command, you will want to run the `aggregate-metrics` command to aggregate the new validation data submitted.

### submit-terra-metrics

The following is an example command that submits metrics from a CSV file that Terra provided, recording the metrics that were skipped into an output CSV file.

```
java -jar target/metricsaggregator-*-SNAPSHOT.jar submit-terra-metrics --config my-custom-config \
--data <path-to-terra-metrics-csv-file> --recordSkipped
```

After running this command, you will want to run the `aggregate-metrics` command to aggregate the new validation data submitted.
After running this command, you will want to run the `aggregate-metrics` command to aggregate the new Terra metrics submitted.
14 changes: 14 additions & 0 deletions metricsaggregator/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,14 @@
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-core</artifactId>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
</dependency>
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>s3</artifactId>
Expand All @@ -144,6 +152,10 @@
<groupId>javax.money</groupId>
<artifactId>money-api</artifactId>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-csv</artifactId>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-api</artifactId>
Expand Down Expand Up @@ -278,6 +290,8 @@
<usedDependency>org.glassfish.jersey.inject:jersey-hk2</usedDependency>
<usedDependency>javax.money:money-api</usedDependency>
<usedDependency>org.javamoney.moneta:moneta-core</usedDependency>
<usedDependency>ch.qos.logback:logback-classic</usedDependency>
<usedDependency>ch.qos.logback:logback-core</usedDependency>
</usedDependencies>
</configuration>
</execution>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,9 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Queue;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.services.s3.S3Client;
Expand All @@ -52,6 +53,9 @@

private static final Logger LOG = LoggerFactory.getLogger(MetricsAggregatorS3Client.class);
private static final Gson GSON = new Gson();
private final AtomicInteger numberOfDirectoriesProcessed = new AtomicInteger(0);
private final AtomicInteger numberOfMetricsSubmitted = new AtomicInteger(0);
private final AtomicInteger numberOfMetricsSkipped = new AtomicInteger(0);

private final String bucketName;

Expand All @@ -71,60 +75,80 @@
}

public void aggregateMetrics(ExtendedGa4GhApi extendedGa4GhApi) {
LOG.info("Getting directories to process...");
List<S3DirectoryInfo> metricsDirectories = getDirectories();

if (metricsDirectories.isEmpty()) {
System.out.println("No directories found to aggregate metrics");
LOG.info("No directories found to aggregate metrics");

Check warning on line 82 in metricsaggregator/src/main/java/io/dockstore/metricsaggregator/MetricsAggregatorS3Client.java

View check run for this annotation

Codecov / codecov/patch

metricsaggregator/src/main/java/io/dockstore/metricsaggregator/MetricsAggregatorS3Client.java#L82

Added line #L82 was not covered by tests
return;
}

System.out.println("Aggregating metrics...");
for (S3DirectoryInfo directoryInfo : metricsDirectories) {
String toolId = directoryInfo.toolId();
String versionName = directoryInfo.versionId();
List<String> platforms = directoryInfo.platforms();
String platformsString = String.join(", ", platforms);
String versionS3KeyPrefix = directoryInfo.versionS3KeyPrefix();
LOG.info("Aggregating metrics for {} directories", metricsDirectories.size());
metricsDirectories.stream()
.parallel()
.forEach(directoryInfo -> aggregateMetricsForDirectory(directoryInfo, extendedGa4GhApi));
LOG.info("Completed aggregating metrics. Processed {} directories, submitted {} platform metrics, and skipped {} platform metrics", numberOfDirectoriesProcessed, numberOfMetricsSubmitted, numberOfMetricsSkipped);
}

// Collect metrics for each platform, so we can calculate metrics across all platforms
List<Metrics> allMetrics = new ArrayList<>();
for (String platform : platforms) {
ExecutionsRequestBody allSubmissions;
try {
allSubmissions = getExecutions(toolId, versionName, platform);
} catch (Exception e) {
LOG.error("Error aggregating metrics: Could not get all executions from directory {}", versionS3KeyPrefix, e);
continue; // Continue aggregating metrics for other directories
}
private void aggregateMetricsForDirectory(S3DirectoryInfo directoryInfo, ExtendedGa4GhApi extendedGa4GhApi) {
LOG.info("Processing directory {}", directoryInfo);
String toolId = directoryInfo.toolId();
String versionName = directoryInfo.versionId();
List<String> platforms = directoryInfo.platforms();
String platformsString = String.join(", ", platforms);
String versionS3KeyPrefix = directoryInfo.versionS3KeyPrefix();

try {
getAggregatedMetrics(allSubmissions).ifPresent(metrics -> {
extendedGa4GhApi.aggregatedMetricsPut(metrics, platform, toolId, versionName);
System.out.printf("Aggregated metrics for tool ID %s, version %s, platform %s from directory %s%n", toolId, versionName, platform, versionS3KeyPrefix);
allMetrics.add(metrics);
});
} catch (Exception e) {
LOG.error("Error aggregating metrics: Could not put all executions from directory {}", versionS3KeyPrefix, e);
// Continue aggregating metrics for other platforms
}
// Collect metrics for each platform, so we can calculate metrics across all platforms
List<Metrics> allMetrics = new ArrayList<>();
for (String platform : platforms) {
ExecutionsRequestBody allSubmissions;
try {
allSubmissions = getExecutions(toolId, versionName, platform);
} catch (Exception e) {
LOG.error("Error aggregating metrics: Could not get all executions from directory {}", versionS3KeyPrefix, e);
numberOfMetricsSkipped.incrementAndGet();
continue; // Continue aggregating metrics for other directories

Check warning on line 110 in metricsaggregator/src/main/java/io/dockstore/metricsaggregator/MetricsAggregatorS3Client.java

View check run for this annotation

Codecov / codecov/patch

metricsaggregator/src/main/java/io/dockstore/metricsaggregator/MetricsAggregatorS3Client.java#L107-L110

Added lines #L107 - L110 were not covered by tests
}

if (!allMetrics.isEmpty()) {
// Calculate metrics across all platforms by aggregating the aggregated metrics from each platform
try {
getAggregatedMetrics(allMetrics).ifPresent(metrics -> {
extendedGa4GhApi.aggregatedMetricsPut(metrics, Partner.ALL.name(), toolId, versionName);
System.out.printf("Aggregated metrics across all platforms (%s) for tool ID %s, version %s from directory %s%n",
platformsString, toolId, versionName, versionS3KeyPrefix);
allMetrics.add(metrics);
});
} catch (Exception e) {
LOG.error("Error aggregating metrics across all platforms ({}) for tool ID {}, version {} from directory {}", platformsString, toolId, versionName, versionS3KeyPrefix, e);
// Continue aggregating metrics for other directories
try {
Optional<Metrics> aggregatedPlatformMetric = getAggregatedMetrics(allSubmissions);
if (aggregatedPlatformMetric.isPresent()) {
extendedGa4GhApi.aggregatedMetricsPut(aggregatedPlatformMetric.get(), platform, toolId, versionName);
LOG.info("Aggregated metrics for tool ID {}, version {}, platform {} from directory {}", toolId, versionName, platform,
versionS3KeyPrefix);
allMetrics.add(aggregatedPlatformMetric.get());
numberOfMetricsSubmitted.incrementAndGet();
} else {
LOG.error("Error aggregating metrics for tool ID {}, version {}, platform {} from directory {}", toolId, versionName, platform, versionS3KeyPrefix);

Check warning on line 122 in metricsaggregator/src/main/java/io/dockstore/metricsaggregator/MetricsAggregatorS3Client.java

View check run for this annotation

Codecov / codecov/patch

metricsaggregator/src/main/java/io/dockstore/metricsaggregator/MetricsAggregatorS3Client.java#L122

Added line #L122 was not covered by tests
}
} catch (Exception e) {
LOG.error("Error aggregating metrics: Could not put all executions from directory {}", versionS3KeyPrefix, e);
numberOfMetricsSkipped.incrementAndGet();

Check warning on line 126 in metricsaggregator/src/main/java/io/dockstore/metricsaggregator/MetricsAggregatorS3Client.java

View check run for this annotation

Codecov / codecov/patch

metricsaggregator/src/main/java/io/dockstore/metricsaggregator/MetricsAggregatorS3Client.java#L124-L126

Added lines #L124 - L126 were not covered by tests
// Continue aggregating metrics for other platforms
}
}
System.out.println("Completed aggregating metrics");

if (!allMetrics.isEmpty()) {
// Calculate metrics across all platforms by aggregating the aggregated metrics from each platform
try {
getAggregatedMetrics(allMetrics).ifPresent(metrics -> {
extendedGa4GhApi.aggregatedMetricsPut(metrics, Partner.ALL.name(), toolId, versionName);
LOG.info("Aggregated metrics across all platforms ({}) for tool ID {}, version {} from directory {}",
platformsString, toolId, versionName, versionS3KeyPrefix);
allMetrics.add(metrics);
numberOfMetricsSubmitted.incrementAndGet();
});
} catch (Exception e) {
LOG.error("Error aggregating metrics across all platforms ({}) for tool ID {}, version {} from directory {}", platformsString, toolId, versionName, versionS3KeyPrefix, e);
numberOfMetricsSkipped.incrementAndGet();

Check warning on line 143 in metricsaggregator/src/main/java/io/dockstore/metricsaggregator/MetricsAggregatorS3Client.java

View check run for this annotation

Codecov / codecov/patch

metricsaggregator/src/main/java/io/dockstore/metricsaggregator/MetricsAggregatorS3Client.java#L141-L143

Added lines #L141 - L143 were not covered by tests
// Continue aggregating metrics for other directories
}
} else {
LOG.error("Error aggregating metrics for directory {}: no platform metrics aggregated", versionS3KeyPrefix);
numberOfMetricsSkipped.incrementAndGet();

Check warning on line 148 in metricsaggregator/src/main/java/io/dockstore/metricsaggregator/MetricsAggregatorS3Client.java

View check run for this annotation

Codecov / codecov/patch

metricsaggregator/src/main/java/io/dockstore/metricsaggregator/MetricsAggregatorS3Client.java#L147-L148

Added lines #L147 - L148 were not covered by tests
}
numberOfDirectoriesProcessed.incrementAndGet();
LOG.info("Processed {} directories", numberOfDirectoriesProcessed);
}

/**
Expand Down Expand Up @@ -152,7 +176,7 @@
try {
executionsFromOneSubmission = GSON.fromJson(fileContent, ExecutionsRequestBody.class);
} catch (JsonSyntaxException e) {
LOG.error("Could not read execution(s) from S3 key {}, ignoring file", metricsData.s3Key());
LOG.error("Could not read execution(s) from S3 key {}, ignoring file", metricsData.s3Key(), e);

Check warning on line 179 in metricsaggregator/src/main/java/io/dockstore/metricsaggregator/MetricsAggregatorS3Client.java

View check run for this annotation

Codecov / codecov/patch

metricsaggregator/src/main/java/io/dockstore/metricsaggregator/MetricsAggregatorS3Client.java#L179

Added line #L179 was not covered by tests
continue;
}

Expand Down Expand Up @@ -196,20 +220,6 @@
.aggregatedExecutions(executionIdToAggregatedExecutionMap.values().stream().toList());
}

/**
* If the execution ID is null, generate a random one for the purposes of aggregation.
* Executions that were submitted to S3 prior to the existence of execution IDs don't have an execution ID,
* thus for the purposes of aggregation, generate one.
* @param executionId
* @return
*/
private String generateExecutionIdIfNull(String executionId) {
if (executionId == null) {
return UUID.randomUUID().toString();
}
return executionId;
}

/**
* Returns a unique list of directories containing metrics files.
* For example, suppose the local-dockstore-metrics-data bucket looks like the following.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,4 +85,51 @@
return executionId;
}
}

@Parameters(commandNames = { "submit-terra-metrics" }, commandDescription = "Submits workflow metrics provided by Terra via a CSV file to Dockstore")
public static class SubmitTerraMetrics extends CommandLineArgs {
@Parameter(names = {"-c", "--config"}, description = "The config file path.")
private File config = new File("./" + MetricsAggregatorClient.CONFIG_FILE_NAME);


@Parameter(names = {"-d", "--data"}, description = "The file path to the CSV file containing workflow metrics from Terra. The first line of the file should contain the CSV fields: workflow_id,status,workflow_start,workflow_end,workflow_runtime_minutes,source_url", required = true)
private String dataFilePath;

@Parameter(names = {"-r", "--recordSkipped"}, description = "Record skipped executions and the reason skipped to a CSV file")
private boolean recordSkippedExecutions;

@Parameter(names = {"-de", "--description"}, description = "Optional description about the metrics to include when submitting metrics to Dockstore")
private String description;

public File getConfig() {
return config;
}

public String getDataFilePath() {
return dataFilePath;
}

public String getDescription() {
return description;
}

public boolean isRecordSkippedExecutions() {
return recordSkippedExecutions;
}

/**
* Headers for the input data file
*/
public enum TerraMetricsCsvHeaders {
workflow_id, status, workflow_start, workflow_end, workflow_runtime_minutes, source_url
}

/**
* Headers for the output file containing workflow executions that were skipped.
* The headers are the same as the input file headers, with the addition of a "reason" header indicating why an execution was skipped
*/
public enum SkippedTerraMetricsCsvHeaders {
workflow_id, status, workflow_start, workflow_end, workflow_runtime_minutes, source_url, reason_skipped

Check warning on line 132 in metricsaggregator/src/main/java/io/dockstore/metricsaggregator/client/cli/CommandLineArgs.java

View check run for this annotation

Codecov / codecov/patch

metricsaggregator/src/main/java/io/dockstore/metricsaggregator/client/cli/CommandLineArgs.java#L131-L132

Added lines #L131 - L132 were not covered by tests
}
}
}
Loading