From e94b9c5e411932781c8400e30ed5d9345b62b64e Mon Sep 17 00:00:00 2001 From: Kathy Tran Date: Fri, 13 Sep 2024 11:32:25 -0400 Subject: [PATCH] Remove the java aggregation --- metricsaggregator/pom.xml | 10 - .../metricsaggregator/DoubleStatistics.java | 114 ------ .../MetricsAggregatorAthenaClient.java | 53 ++- .../MetricsAggregatorS3Client.java | 151 -------- .../metricsaggregator/MoneyStatistics.java | 90 ----- .../metricsaggregator/Statistics.java | 104 ----- .../client/cli/CommandLineArgs.java | 14 +- .../client/cli/MetricsAggregatorClient.java | 16 +- .../helper/AggregationHelper.java | 75 ---- .../helper/AthenaAggregator.java | 4 + .../helper/CostAggregator.java | 89 ----- .../helper/CpuAggregator.java | 80 ---- .../helper/ExecutionAggregator.java | 109 ------ .../helper/ExecutionStatusAggregator.java | 164 -------- .../helper/ExecutionTimeAggregator.java | 115 ------ .../helper/MemoryAggregator.java | 76 ---- .../helper/RunExecutionAggregator.java | 62 --- .../helper/RunExecutionAthenaAggregator.java | 1 + .../helper/ValidationExecutionAggregator.java | 33 -- .../helper/ValidationStatusAggregator.java | 201 ---------- .../DoubleStatisticsTest.java | 36 -- .../MoneyStatisticsTest.java | 28 -- .../client/cli/MetricsAggregatorClientIT.java | 4 + .../helper/AggregationHelperTest.java | 100 ----- .../helper/CostAggregatorTest.java | 81 ---- .../helper/CpuAggregatorTest.java | 80 ---- .../helper/ExecutionStatusAggregatorTest.java | 362 ------------------ .../helper/ExecutionTimeAggregatorTest.java | 88 ----- .../helper/MemoryAggregatorTest.java | 80 ---- 29 files changed, 57 insertions(+), 2363 deletions(-) delete mode 100644 metricsaggregator/src/main/java/io/dockstore/metricsaggregator/DoubleStatistics.java delete mode 100644 metricsaggregator/src/main/java/io/dockstore/metricsaggregator/MoneyStatistics.java delete mode 100644 metricsaggregator/src/main/java/io/dockstore/metricsaggregator/Statistics.java delete mode 100644 metricsaggregator/src/main/java/io/dockstore/metricsaggregator/helper/AggregationHelper.java delete mode 100644 metricsaggregator/src/main/java/io/dockstore/metricsaggregator/helper/CostAggregator.java delete mode 100644 metricsaggregator/src/main/java/io/dockstore/metricsaggregator/helper/CpuAggregator.java delete mode 100644 metricsaggregator/src/main/java/io/dockstore/metricsaggregator/helper/ExecutionAggregator.java delete mode 100644 metricsaggregator/src/main/java/io/dockstore/metricsaggregator/helper/ExecutionStatusAggregator.java delete mode 100644 metricsaggregator/src/main/java/io/dockstore/metricsaggregator/helper/ExecutionTimeAggregator.java delete mode 100644 metricsaggregator/src/main/java/io/dockstore/metricsaggregator/helper/MemoryAggregator.java delete mode 100644 metricsaggregator/src/main/java/io/dockstore/metricsaggregator/helper/RunExecutionAggregator.java delete mode 100644 metricsaggregator/src/main/java/io/dockstore/metricsaggregator/helper/ValidationExecutionAggregator.java delete mode 100644 metricsaggregator/src/main/java/io/dockstore/metricsaggregator/helper/ValidationStatusAggregator.java delete mode 100644 metricsaggregator/src/test/java/io/dockstore/metricsaggregator/DoubleStatisticsTest.java delete mode 100644 metricsaggregator/src/test/java/io/dockstore/metricsaggregator/MoneyStatisticsTest.java delete mode 100644 metricsaggregator/src/test/java/io/dockstore/metricsaggregator/helper/AggregationHelperTest.java delete mode 100644 metricsaggregator/src/test/java/io/dockstore/metricsaggregator/helper/CostAggregatorTest.java delete mode 100644 metricsaggregator/src/test/java/io/dockstore/metricsaggregator/helper/CpuAggregatorTest.java delete mode 100644 metricsaggregator/src/test/java/io/dockstore/metricsaggregator/helper/ExecutionStatusAggregatorTest.java delete mode 100644 metricsaggregator/src/test/java/io/dockstore/metricsaggregator/helper/ExecutionTimeAggregatorTest.java delete mode 100644 metricsaggregator/src/test/java/io/dockstore/metricsaggregator/helper/MemoryAggregatorTest.java diff --git a/metricsaggregator/pom.xml b/metricsaggregator/pom.xml index 58418089..1cd1e05d 100644 --- a/metricsaggregator/pom.xml +++ b/metricsaggregator/pom.xml @@ -160,14 +160,6 @@ software.amazon.awssdk aws-core - - org.javamoney.moneta - moneta-core - - - javax.money - money-api - org.apache.commons commons-csv @@ -326,8 +318,6 @@ org.slf4j:slf4j-api software.amazon.awssdk:s3 org.glassfish.jersey.inject:jersey-hk2 - javax.money:money-api - org.javamoney.moneta:moneta-core ch.qos.logback:logback-classic ch.qos.logback:logback-core com.google.guava:guava diff --git a/metricsaggregator/src/main/java/io/dockstore/metricsaggregator/DoubleStatistics.java b/metricsaggregator/src/main/java/io/dockstore/metricsaggregator/DoubleStatistics.java deleted file mode 100644 index a5604573..00000000 --- a/metricsaggregator/src/main/java/io/dockstore/metricsaggregator/DoubleStatistics.java +++ /dev/null @@ -1,114 +0,0 @@ -/* - * Copyright 2023 OICR and UCSC - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package io.dockstore.metricsaggregator; - -import java.util.List; - -/** - * Record that contains statistical information obtained from a list of Doubles. - */ -public class DoubleStatistics extends Statistics { - public DoubleStatistics() { - super(); - } - - /** - * Constructor that calculates statistical information from the provided list of data points. - * @param dataPoints List of Doubles - */ - public DoubleStatistics(List dataPoints) { - super(dataPoints); - } - - public DoubleStatistics(Double minimum, Double maximum, Double average, int numberOfDataPoints) { - super(minimum, maximum, average, numberOfDataPoints); - } - - /** - * Constructor used to create a Statistics object that can be used to calculate weighted averages for non-Statistics objects. - * A placeholder value is set for the min and maximum fields - * @param average - * @param numberOfDataPoints - */ - public DoubleStatistics(double average, int numberOfDataPoints) { - super(0d, 0d, average, numberOfDataPoints); - } - - /** - * Create a new Statistics object from a list of statistics by aggregating the list of statistics - * @param statistics - * @return - */ - public static DoubleStatistics createFromStatistics(List statistics) { - if (statistics.size() == 1) { - return statistics.get(0); - } - - DoubleStatistics newStatistics = new DoubleStatistics(); - newStatistics.setAverage(statistics); - newStatistics.setMinimum(statistics); - newStatistics.setMaximum(statistics); - newStatistics.setNumberOfDataPoints(statistics); - return newStatistics; - } - - /** - * Get the lowest value from the list of data points. - * @param dataPoints - * @return - */ - @Override - public Double calculateMinimum(List dataPoints) { - return dataPoints.stream().mapToDouble(d -> d).min().orElse(0); - } - - /** - * Get the highest value from the list of data points. - * @param dataPoints - * @return - */ - @Override - public Double calculateMaximum(List dataPoints) { - return dataPoints.stream().mapToDouble(d -> d).max().orElse(0); - } - - /** - * Calculate the average from the list of data points. - * @param dataPoints - * @return - */ - @Override - public Double calculateAverage(List dataPoints) { - return dataPoints.stream().mapToDouble(d -> d).average().orElse(0); - } - - /** - * Calculate a weighted average - */ - @Override - public Double calculateWeightedAverage(List> statistics) { - int totalNumberOfDataPoints = getTotalNumberOfDataPoints(statistics); - return statistics.stream() - .map(stat -> { - double weight = (double)stat.getNumberOfDataPoints() / (double)totalNumberOfDataPoints; - return stat.getAverage() * weight; - }) - .mapToDouble(Double::doubleValue) - .sum(); - } -} diff --git a/metricsaggregator/src/main/java/io/dockstore/metricsaggregator/MetricsAggregatorAthenaClient.java b/metricsaggregator/src/main/java/io/dockstore/metricsaggregator/MetricsAggregatorAthenaClient.java index 8ad2cccc..874b6c9d 100644 --- a/metricsaggregator/src/main/java/io/dockstore/metricsaggregator/MetricsAggregatorAthenaClient.java +++ b/metricsaggregator/src/main/java/io/dockstore/metricsaggregator/MetricsAggregatorAthenaClient.java @@ -51,7 +51,7 @@ public class MetricsAggregatorAthenaClient { private final AtomicInteger numberOfVersionsSubmitted = new AtomicInteger(0); private final AtomicInteger numberOfVersionsSkipped = new AtomicInteger(0); - public MetricsAggregatorAthenaClient(MetricsAggregatorConfig config) { + public MetricsAggregatorAthenaClient(MetricsAggregatorConfig config, boolean isDryRun) { this.metricsBucketName = config.getS3Config().bucket(); this.athenaWorkgroup = config.getAthenaConfig().workgroup(); @@ -63,8 +63,10 @@ public MetricsAggregatorAthenaClient(MetricsAggregatorConfig config) { this.executionStatusAggregator = new ExecutionStatusAthenaAggregator(this, tableName); this.validationStatusAggregator = new ValidationStatusAthenaAggregator(this, tableName); - AthenaAggregator.createDatabase(databaseName, this); - AthenaAggregator.createTable(tableName, metricsBucketName, metadataApi, this); + if (!isDryRun) { + AthenaAggregator.createDatabase(databaseName, this); + AthenaAggregator.createTable(tableName, metricsBucketName, metadataApi, this); + } } /** @@ -73,10 +75,10 @@ public MetricsAggregatorAthenaClient(MetricsAggregatorConfig config) { * @param extendedGa4GhApi * @param skipPostingToDockstore */ - public void aggregateMetrics(List s3DirectoriesToAggregate, ExtendedGa4GhApi extendedGa4GhApi, boolean skipPostingToDockstore) { + public void aggregateMetrics(List s3DirectoriesToAggregate, ExtendedGa4GhApi extendedGa4GhApi, boolean skipPostingToDockstore, boolean isDryRun) { // Aggregate metrics for each directory s3DirectoriesToAggregate.stream().parallel().forEach(s3DirectoryInfo -> { - Map platformToMetrics = getAggregatedMetricsForPlatforms(s3DirectoryInfo); + Map platformToMetrics = getAggregatedMetricsForPlatforms(s3DirectoryInfo, isDryRun); if (platformToMetrics.isEmpty()) { LOG.error("No metrics were aggregated for tool ID: {}, version {}", s3DirectoryInfo.toolId(), s3DirectoryInfo.versionId()); numberOfVersionsSkipped.incrementAndGet(); @@ -143,26 +145,35 @@ public List executeQuery(String query) throws AwsServiceExceptio * @param s3DirectoryInfo * @return */ - public Map getAggregatedMetricsForPlatforms(S3DirectoryInfo s3DirectoryInfo) { + public Map getAggregatedMetricsForPlatforms(S3DirectoryInfo s3DirectoryInfo, boolean isDryRun) { LOG.info("Aggregating metrics for directory: {}", s3DirectoryInfo.versionS3KeyPrefix()); Map platformToMetrics = new HashMap<>(); AthenaTablePartition athenaTablePartition = s3DirectoryInfo.athenaTablePartition(); try { - // Calculate metrics for runexecutions - Map executionStatusMetricByPlatform = executionStatusAggregator.createMetricByPlatform(athenaTablePartition); - // Calculate metrics for validationexecutions - Map validationStatusMetricByPlatform = validationStatusAggregator.createMetricByPlatform(athenaTablePartition); - - s3DirectoryInfo.platforms().forEach(platform -> { - ExecutionStatusMetric executionStatusMetric = executionStatusMetricByPlatform.get(platform); - ValidationStatusMetric validationStatusMetric = validationStatusMetricByPlatform.get(platform); - - if (executionStatusMetric != null || validationStatusMetric != null) { - platformToMetrics.putIfAbsent(platform, new Metrics().executionStatusCount(executionStatusMetric).validationStatus(validationStatusMetric)); - LOG.info("Aggregated metrics for tool ID {}, version {}, platform {} from directory {}", s3DirectoryInfo.toolId(), - s3DirectoryInfo.versionId(), platform, s3DirectoryInfo.versionS3KeyPrefix()); - } - }); + if (isDryRun) { + executionStatusAggregator.printQuery(athenaTablePartition); + validationStatusAggregator.printQuery(athenaTablePartition); + } else { + + // Calculate metrics for runexecutions + Map executionStatusMetricByPlatform = executionStatusAggregator.createMetricByPlatform( + athenaTablePartition); + // Calculate metrics for validationexecutions + Map validationStatusMetricByPlatform = validationStatusAggregator.createMetricByPlatform( + athenaTablePartition); + + s3DirectoryInfo.platforms().forEach(platform -> { + ExecutionStatusMetric executionStatusMetric = executionStatusMetricByPlatform.get(platform); + ValidationStatusMetric validationStatusMetric = validationStatusMetricByPlatform.get(platform); + + if (executionStatusMetric != null || validationStatusMetric != null) { + platformToMetrics.putIfAbsent(platform, + new Metrics().executionStatusCount(executionStatusMetric).validationStatus(validationStatusMetric)); + LOG.info("Aggregated metrics for tool ID {}, version {}, platform {} from directory {}", s3DirectoryInfo.toolId(), + s3DirectoryInfo.versionId(), platform, s3DirectoryInfo.versionS3KeyPrefix()); + } + }); + } } catch (Exception e) { // Log error and continue LOG.error("Could not aggregate metrics for tool ID {}, version {}", s3DirectoryInfo.toolId(), s3DirectoryInfo.versionId(), e); diff --git a/metricsaggregator/src/main/java/io/dockstore/metricsaggregator/MetricsAggregatorS3Client.java b/metricsaggregator/src/main/java/io/dockstore/metricsaggregator/MetricsAggregatorS3Client.java index 370aec63..7723e3f8 100644 --- a/metricsaggregator/src/main/java/io/dockstore/metricsaggregator/MetricsAggregatorS3Client.java +++ b/metricsaggregator/src/main/java/io/dockstore/metricsaggregator/MetricsAggregatorS3Client.java @@ -17,31 +17,13 @@ package io.dockstore.metricsaggregator; -import static io.dockstore.metricsaggregator.helper.AggregationHelper.getAggregatedMetrics; - -import com.google.gson.Gson; -import com.google.gson.JsonSyntaxException; -import io.dockstore.common.Partner; import io.dockstore.common.S3ClientHelper; -import io.dockstore.common.metrics.ExecutionsRequestBody; -import io.dockstore.common.metrics.MetricsData; -import io.dockstore.common.metrics.MetricsDataS3Client; -import io.dockstore.common.metrics.RunExecution; -import io.dockstore.common.metrics.TaskExecutions; -import io.dockstore.common.metrics.ValidationExecution; import io.dockstore.metricsaggregator.MetricsAggregatorAthenaClient.AthenaTablePartition; -import io.dockstore.openapi.client.api.ExtendedGa4GhApi; -import io.dockstore.openapi.client.model.Metrics; -import java.io.IOException; import java.net.URISyntaxException; import java.util.ArrayDeque; import java.util.ArrayList; -import java.util.HashMap; import java.util.List; -import java.util.Map; -import java.util.Optional; import java.util.Queue; -import java.util.concurrent.atomic.AtomicInteger; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import software.amazon.awssdk.services.s3.S3Client; @@ -52,152 +34,19 @@ public class MetricsAggregatorS3Client { private static final Logger LOG = LoggerFactory.getLogger(MetricsAggregatorS3Client.class); - private static final Gson GSON = new Gson(); - private final AtomicInteger numberOfDirectoriesProcessed = new AtomicInteger(0); - private final AtomicInteger numberOfVersionsSubmitted = new AtomicInteger(0); - private final AtomicInteger numberOfVersionsSkipped = new AtomicInteger(0); private final String bucketName; private final S3Client s3Client; - private final MetricsDataS3Client metricsDataS3Client; public MetricsAggregatorS3Client(String bucketName) { this.bucketName = bucketName; this.s3Client = S3ClientHelper.getS3Client(); - this.metricsDataS3Client = new MetricsDataS3Client(bucketName, this.s3Client); } public MetricsAggregatorS3Client(String bucketName, String s3EndpointOverride) throws URISyntaxException { this.bucketName = bucketName; this.s3Client = S3ClientHelper.createS3Client(s3EndpointOverride); - this.metricsDataS3Client = new MetricsDataS3Client(bucketName, s3EndpointOverride); - } - - public void aggregateMetrics(List s3DirectoriesToAggregate, ExtendedGa4GhApi extendedGa4GhApi, boolean skipDockstore) { - s3DirectoriesToAggregate.stream().parallel().forEach(directoryInfo -> aggregateMetricsForDirectory(directoryInfo, extendedGa4GhApi, skipDockstore)); - LOG.info("Completed aggregating metrics. Processed {} directories, submitted {} platform metrics, and skipped {} platform metrics", numberOfDirectoriesProcessed, - numberOfVersionsSubmitted, - numberOfVersionsSkipped); - } - - private void aggregateMetricsForDirectory(S3DirectoryInfo directoryInfo, ExtendedGa4GhApi extendedGa4GhApi, boolean skipDockstore) { - LOG.info("Processing directory {}", directoryInfo); - String toolId = directoryInfo.toolId(); - String versionName = directoryInfo.versionId(); - List platforms = directoryInfo.platforms(); - String versionS3KeyPrefix = directoryInfo.versionS3KeyPrefix(); - - // Collect metrics for each platform, so we can calculate metrics across all platforms - Map platformToMetrics = new HashMap<>(); - for (String platform : platforms) { - ExecutionsRequestBody allSubmissions; - try { - allSubmissions = getExecutions(toolId, versionName, platform); - } catch (Exception e) { - LOG.error("Error aggregating metrics: Could not get all executions from directory {}", versionS3KeyPrefix, e); - numberOfVersionsSkipped.incrementAndGet(); - continue; // Continue aggregating metrics for other directories - } - - try { - Optional aggregatedPlatformMetric = getAggregatedMetrics(allSubmissions); - if (aggregatedPlatformMetric.isPresent()) { - LOG.info("Aggregated metrics for tool ID {}, version {}, platform {} from directory {}", toolId, versionName, platform, - versionS3KeyPrefix); - platformToMetrics.put(platform, aggregatedPlatformMetric.get()); - } else { - LOG.error("Error aggregating metrics for tool ID {}, version {}, platform {} from directory {}", toolId, versionName, platform, versionS3KeyPrefix); - } - } catch (Exception e) { - LOG.error("Error aggregating metrics: Could not put all executions from directory {}", versionS3KeyPrefix, e); - numberOfVersionsSkipped.incrementAndGet(); - // Continue aggregating metrics for other platforms - } - } - - if (!platformToMetrics.isEmpty()) { - // Calculate metrics across all platforms by aggregating the aggregated metrics from each platform - try { - getAggregatedMetrics(platformToMetrics.values().stream().toList()).ifPresent(metrics -> { - platformToMetrics.put(Partner.ALL.name(), metrics); - if (!skipDockstore) { - extendedGa4GhApi.aggregatedMetricsPut(platformToMetrics, toolId, versionName); - } - LOG.info("Aggregated metrics across all platforms ({}) for tool ID {}, version {} from directory {}", - platformToMetrics.keySet(), toolId, versionName, versionS3KeyPrefix); - - numberOfVersionsSubmitted.incrementAndGet(); - }); - } catch (Exception e) { - LOG.error("Error aggregating metrics across all platforms ({}) for tool ID {}, version {} from directory {}", platformToMetrics.keySet(), toolId, versionName, versionS3KeyPrefix, e); - numberOfVersionsSkipped.incrementAndGet(); - // Continue aggregating metrics for other directories - } - } else { - LOG.error("Error aggregating metrics for directory {}: no platform metrics aggregated", versionS3KeyPrefix); - numberOfVersionsSkipped.incrementAndGet(); - } - numberOfDirectoriesProcessed.incrementAndGet(); - LOG.info("Processed {} directories", numberOfDirectoriesProcessed); - } - - /** - * Get all executions from all submissions for the specific tool, version, and platform. - * If there are executions with the same execution ID, the function takes the newest execution. - * @param toolId - * @param versionName - * @param platform - * @return - */ - private ExecutionsRequestBody getExecutions(String toolId, String versionName, String platform) throws IOException, JsonSyntaxException { - // getMetricsData uses the S3 ListObjectsV2Request which returns objects in alphabetical order. - // Since the file names are the time of submission in milliseconds, metricsDataList is sorted from oldest file name to newest file name - List metricsDataList = metricsDataS3Client.getMetricsData(toolId, versionName, Partner.valueOf(platform)); - Map executionIdToWorkflowExecutionMap = new HashMap<>(); - Map executionIdToTaskExecutionsMap = new HashMap<>(); - Map executionIdToValidationExecutionMap = new HashMap<>(); - - for (MetricsData metricsData : metricsDataList) { - String fileContent = metricsDataS3Client.getMetricsDataFileContent(metricsData.toolId(), metricsData.toolVersionName(), - metricsData.platform(), metricsData.fileName()); - - ExecutionsRequestBody executionsFromOneSubmission; - try { - executionsFromOneSubmission = GSON.fromJson(fileContent, ExecutionsRequestBody.class); - } catch (JsonSyntaxException e) { - LOG.error("Could not read execution(s) from S3 key {}, ignoring file", metricsData.s3Key(), e); - continue; - } - - // For each execution, put it in a map so that there are no executions with duplicate execution IDs. - // The latest execution put in the map is the newest one based on the principal that S3 lists objects in alphabetical order, - // which is returned in an ordered list via getMetricsData. - executionsFromOneSubmission.getRunExecutions().forEach(workflowExecution -> { - final String executionId = workflowExecution.getExecutionId(); - executionIdToWorkflowExecutionMap.put(executionId, workflowExecution); - executionIdToValidationExecutionMap.remove(executionId); - executionIdToTaskExecutionsMap.remove(executionId); - }); - executionsFromOneSubmission.getTaskExecutions().forEach(taskExecutions -> { - final String executionId = taskExecutions.getExecutionId(); - executionIdToTaskExecutionsMap.put(executionId, taskExecutions); - executionIdToWorkflowExecutionMap.remove(executionId); - executionIdToValidationExecutionMap.remove(executionId); - }); - executionsFromOneSubmission.getValidationExecutions().forEach(validationExecution -> { - final String executionId = validationExecution.getExecutionId(); - executionIdToValidationExecutionMap.put(executionId, validationExecution); - executionIdToWorkflowExecutionMap.remove(executionId); - executionIdToTaskExecutionsMap.remove(executionId); - }); - } - - ExecutionsRequestBody executionsRequestBody = new ExecutionsRequestBody(); - executionsRequestBody.setRunExecutions(executionIdToWorkflowExecutionMap.values().stream().toList()); - executionsRequestBody.setTaskExecutions(executionIdToTaskExecutionsMap.values().stream().toList()); - executionsRequestBody.setValidationExecutions(executionIdToValidationExecutionMap.values().stream().toList()); - return executionsRequestBody; } /** diff --git a/metricsaggregator/src/main/java/io/dockstore/metricsaggregator/MoneyStatistics.java b/metricsaggregator/src/main/java/io/dockstore/metricsaggregator/MoneyStatistics.java deleted file mode 100644 index 6b172ad5..00000000 --- a/metricsaggregator/src/main/java/io/dockstore/metricsaggregator/MoneyStatistics.java +++ /dev/null @@ -1,90 +0,0 @@ -package io.dockstore.metricsaggregator; - -import java.util.List; -import org.javamoney.moneta.Money; - -/** - * Calculates money statistics in USD using the Java Money library to preserve accuracy. - */ -public class MoneyStatistics extends Statistics { - public static final String CURRENCY = "USD"; - - private MoneyStatistics() { - super(); - } - - public MoneyStatistics(List dataPoints) { - super(dataPoints); - } - - public MoneyStatistics(Money minimum, Money maximum, Money average, int numberOfDataPoints) { - super(minimum, maximum, average, numberOfDataPoints); - } - - /** - * Create a new Statistics object from a list of statistics by aggregating the list of statistics - * @param statistics - * @return - */ - public static MoneyStatistics createFromStatistics(List statistics) { - if (statistics.size() == 1) { - return statistics.get(0); - } - - MoneyStatistics newStatistics = new MoneyStatistics(); - newStatistics.setAverage(statistics); - newStatistics.setMinimum(statistics); - newStatistics.setMaximum(statistics); - newStatistics.setNumberOfDataPoints(statistics); - return newStatistics; - } - - /** - * Get the lowest value from the list of data points. - * @param dataPoints - * @return - */ - @Override - public Money calculateMinimum(List dataPoints) { - return dataPoints.stream() - .min(Money::compareTo) - .orElse(Money.of(0, CURRENCY)); - } - - /** - * Get the highest value from the list of data points. - * @param dataPoints - * @return - */ - @Override - public Money calculateMaximum(List dataPoints) { - return dataPoints.stream() - .max(Money::compareTo) - .orElse(Money.of(0, CURRENCY)); - } - - /** - * Calculate the average from the list of data points. - * @param dataPoints - * @return - */ - @Override - public Money calculateAverage(List dataPoints) { - Money sum = dataPoints.stream().reduce(Money.of(0, CURRENCY), Money::add); - return sum.divide(dataPoints.size()); - } - - /** - * Calculate a weighted average - */ - @Override - public Money calculateWeightedAverage(List> statistics) { - int totalNumberOfDataPoints = getTotalNumberOfDataPoints(statistics); - return statistics.stream() - .map(stat -> { - double weight = (double)stat.getNumberOfDataPoints() / (double)totalNumberOfDataPoints; - return stat.getAverage().multiply(weight); - }) - .reduce(Money.of(0, CURRENCY), Money::add); - } -} diff --git a/metricsaggregator/src/main/java/io/dockstore/metricsaggregator/Statistics.java b/metricsaggregator/src/main/java/io/dockstore/metricsaggregator/Statistics.java deleted file mode 100644 index 6e7e5fe2..00000000 --- a/metricsaggregator/src/main/java/io/dockstore/metricsaggregator/Statistics.java +++ /dev/null @@ -1,104 +0,0 @@ -package io.dockstore.metricsaggregator; - -import java.util.List; - -/** - * A class that contains statistical information for a data type. - * @param - */ -public abstract class Statistics { - private T minimum; - private T maximum; - private T average; - private int numberOfDataPoints; - - protected Statistics() { - } - - protected Statistics(T minimum, T maximum, T average, int numberOfDataPoints) { - this.minimum = minimum; - this.maximum = maximum; - this.average = average; - this.numberOfDataPoints = numberOfDataPoints; - } - - protected Statistics(List dataPoints) { - this.minimum = calculateMinimum(dataPoints); - this.maximum = calculateMaximum(dataPoints); - this.average = calculateAverage(dataPoints); - this.numberOfDataPoints = dataPoints.size(); - } - - public abstract T calculateMinimum(List dataPoints); - public abstract T calculateMaximum(List dataPoints); - public abstract T calculateAverage(List dataPoints); - public abstract T calculateWeightedAverage(List> statistics); - - public T getMinimum() { - return minimum; - } - - public void setMinimum(T min) { - this.minimum = min; - } - - public void setMinimum(List> statistics) { - List dataPoints = statistics.stream() - .map(Statistics::getMinimum) - .toList(); - this.minimum = calculateMinimum(dataPoints); - } - - public T getMaximum() { - return maximum; - } - - public void setMaximum(T max) { - this.maximum = max; - } - - public void setMaximum(List> statistics) { - List dataPoints = statistics.stream() - .map(Statistics::getMaximum) - .toList(); - this.maximum = calculateMaximum(dataPoints); - } - - public T getAverage() { - return average; - } - - public void setAverage(T average) { - this.average = average; - } - - /** - * Sets the average by calculating the weighted average from a list of statistics - * @param statistics - */ - public void setAverage(List> statistics) { - this.average = calculateWeightedAverage(statistics); - } - - public int getNumberOfDataPoints() { - return numberOfDataPoints; - } - - public void setNumberOfDataPoints(int numberOfDataPoints) { - this.numberOfDataPoints = numberOfDataPoints; - } - - public void setNumberOfDataPoints(List> statistics) { - this.numberOfDataPoints = statistics.stream() - .map(Statistics::getNumberOfDataPoints) - .mapToInt(Integer::intValue) - .sum(); - } - - public int getTotalNumberOfDataPoints(List> statistics) { - return statistics.stream() - .map(Statistics::getNumberOfDataPoints) - .mapToInt(Integer::intValue) - .sum(); - } -} diff --git a/metricsaggregator/src/main/java/io/dockstore/metricsaggregator/client/cli/CommandLineArgs.java b/metricsaggregator/src/main/java/io/dockstore/metricsaggregator/client/cli/CommandLineArgs.java index 95d6b8be..a96e5557 100644 --- a/metricsaggregator/src/main/java/io/dockstore/metricsaggregator/client/cli/CommandLineArgs.java +++ b/metricsaggregator/src/main/java/io/dockstore/metricsaggregator/client/cli/CommandLineArgs.java @@ -37,23 +37,19 @@ public static class AggregateMetricsCommand extends CommandLineArgs { @Parameter(names = {"-c", "--config"}, description = "The config file path.") private File config = new File("./" + MetricsAggregatorClient.CONFIG_FILE_NAME); - @Parameter(names = {"--athena"}, description = "Aggregate metrics in S3 using AWS Athena") - private boolean withAthena = false; - @Parameter(names = {"--skipDockstore"}, description = "Skip posting the metrics to Dockstore") private boolean skipDockstore = false; @Parameter(names = {"--trsIds"}, description = "Aggregate metrics for the tools specified TRS ID") private List trsIds; + @Parameter(names = { "--dryRun" }, description = "Do a dry run with Athena by only generating the queries") + private boolean dryRun = false; + public File getConfig() { return config; } - public boolean isWithAthena() { - return withAthena; - } - public boolean isSkipDockstore() { return skipDockstore; } @@ -61,6 +57,10 @@ public boolean isSkipDockstore() { public List getTrsIds() { return trsIds; } + + public boolean isDryRun() { + return dryRun; + } } @Parameters(commandNames = { "submit-validation-data" }, commandDescription = "Formats workflow validation data specified in a file then submits it to Dockstore") diff --git a/metricsaggregator/src/main/java/io/dockstore/metricsaggregator/client/cli/MetricsAggregatorClient.java b/metricsaggregator/src/main/java/io/dockstore/metricsaggregator/client/cli/MetricsAggregatorClient.java index 93018528..6e813bce 100644 --- a/metricsaggregator/src/main/java/io/dockstore/metricsaggregator/client/cli/MetricsAggregatorClient.java +++ b/metricsaggregator/src/main/java/io/dockstore/metricsaggregator/client/cli/MetricsAggregatorClient.java @@ -161,7 +161,11 @@ private void aggregateMetrics(AggregateMetricsCommand aggregateMetricsCommand, M } else { metricsAggregatorS3Client = new MetricsAggregatorS3Client(config.getS3Config().bucket(), config.getS3Config().endpointOverride()); } - LOG.info("Aggregating metrics with {}. Submitting metrics to Dockstore is {}", aggregateMetricsCommand.isWithAthena() ? "AWS Athena" : "Java", skipPostingToDockstore ? "skipped" : "enabled"); + + if (aggregateMetricsCommand.isDryRun()) { + LOG.info("Executing dry run"); + } + LOG.info("Submitting metrics to Dockstore is {}", skipPostingToDockstore ? "skipped" : "enabled"); final Instant getDirectoriesStartTime = Instant.now(); List s3DirectoriesToAggregate; @@ -186,12 +190,10 @@ private void aggregateMetrics(AggregateMetricsCommand aggregateMetricsCommand, M return; } - if (aggregateMetricsCommand.isWithAthena()) { - MetricsAggregatorAthenaClient metricsAggregatorAthenaClient = new MetricsAggregatorAthenaClient(config); - metricsAggregatorAthenaClient.aggregateMetrics(s3DirectoriesToAggregate, extendedGa4GhApi, skipPostingToDockstore); - } else { - metricsAggregatorS3Client.aggregateMetrics(s3DirectoriesToAggregate, extendedGa4GhApi, skipPostingToDockstore); - } + MetricsAggregatorAthenaClient metricsAggregatorAthenaClient = new MetricsAggregatorAthenaClient(config, + aggregateMetricsCommand.isDryRun()); + metricsAggregatorAthenaClient.aggregateMetrics(s3DirectoriesToAggregate, extendedGa4GhApi, skipPostingToDockstore, + aggregateMetricsCommand.isDryRun()); } private void submitValidationData(MetricsAggregatorConfig config, ValidatorToolEnum validator, String validatorVersion, String dataFilePath, Partner platform, String executionId) throws IOException { diff --git a/metricsaggregator/src/main/java/io/dockstore/metricsaggregator/helper/AggregationHelper.java b/metricsaggregator/src/main/java/io/dockstore/metricsaggregator/helper/AggregationHelper.java deleted file mode 100644 index d93f9acf..00000000 --- a/metricsaggregator/src/main/java/io/dockstore/metricsaggregator/helper/AggregationHelper.java +++ /dev/null @@ -1,75 +0,0 @@ -package io.dockstore.metricsaggregator.helper; - -import io.dockstore.common.metrics.ExecutionsRequestBody; -import io.dockstore.openapi.client.model.ExecutionStatusMetric; -import io.dockstore.openapi.client.model.Metrics; -import io.dockstore.openapi.client.model.ValidationStatusMetric; -import java.util.List; -import java.util.Objects; -import java.util.Optional; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public final class AggregationHelper { - private static final Logger LOG = LoggerFactory.getLogger(AggregationHelper.class); - - private AggregationHelper() { - } - - /** - * Aggregate metrics from all submissions. - * - * @param allSubmissions - * @return Metrics object containing aggregated metrics - */ - public static Optional getAggregatedMetrics(ExecutionsRequestBody allSubmissions) { - Metrics aggregatedMetrics = new Metrics(); - // Set run metrics - Optional aggregatedExecutionStatus = new ExecutionStatusAggregator().getAggregatedMetricFromAllSubmissions(allSubmissions); - boolean containsRunMetrics = aggregatedExecutionStatus.isPresent(); - aggregatedExecutionStatus.ifPresent(aggregatedMetrics::setExecutionStatusCount); - - // Set validation metrics - Optional aggregatedValidationStatus = new ValidationStatusAggregator().getAggregatedMetricFromAllSubmissions(allSubmissions); - boolean containsValidationMetrics = aggregatedValidationStatus.isPresent(); - aggregatedValidationStatus.ifPresent(aggregatedMetrics::setValidationStatus); - - // Only return aggregated metrics if it contains either run metrics or validation metrics - if (containsRunMetrics || containsValidationMetrics) { - return Optional.of(aggregatedMetrics); - } - - return Optional.empty(); - } - - /** - * Aggregates metrics into a single metric - * @param aggregatedMetrics - * @return - */ - public static Optional getAggregatedMetrics(List aggregatedMetrics) { - Metrics overallMetrics = new Metrics(); - // Set run metrics - Optional aggregatedExecutionStatus = new ExecutionStatusAggregator().getAggregatedMetricsFromAggregatedMetrics(aggregatedMetrics.stream() - .map(Metrics::getExecutionStatusCount) - .filter(Objects::nonNull) - .toList()); - boolean containsRunMetrics = aggregatedExecutionStatus.isPresent(); - aggregatedExecutionStatus.ifPresent(overallMetrics::setExecutionStatusCount); - - // Set validation metrics - Optional aggregatedValidationStatus = new ValidationStatusAggregator().getAggregatedMetricsFromAggregatedMetrics(aggregatedMetrics.stream() - .map(Metrics::getValidationStatus) - .filter(Objects::nonNull) - .toList()); - boolean containsValidationMetrics = aggregatedValidationStatus.isPresent(); - aggregatedValidationStatus.ifPresent(overallMetrics::setValidationStatus); - - // Only return aggregated metrics if it contains either run metrics or validation metrics - if (containsRunMetrics || containsValidationMetrics) { - return Optional.of(overallMetrics); - } - - return Optional.empty(); - } -} diff --git a/metricsaggregator/src/main/java/io/dockstore/metricsaggregator/helper/AthenaAggregator.java b/metricsaggregator/src/main/java/io/dockstore/metricsaggregator/helper/AthenaAggregator.java index 58660b16..3993ecc5 100644 --- a/metricsaggregator/src/main/java/io/dockstore/metricsaggregator/helper/AthenaAggregator.java +++ b/metricsaggregator/src/main/java/io/dockstore/metricsaggregator/helper/AthenaAggregator.java @@ -77,6 +77,10 @@ public Map createMetricByPlatform(AthenaTablePartition partition) { return createMetricByPlatform(queryResultRows); } + public void printQuery(AthenaTablePartition partition) { + LOG.info(createQuery(partition)); + } + /** * Get the platform column value from the query result row * @param queryResultRow diff --git a/metricsaggregator/src/main/java/io/dockstore/metricsaggregator/helper/CostAggregator.java b/metricsaggregator/src/main/java/io/dockstore/metricsaggregator/helper/CostAggregator.java deleted file mode 100644 index cacfafe2..00000000 --- a/metricsaggregator/src/main/java/io/dockstore/metricsaggregator/helper/CostAggregator.java +++ /dev/null @@ -1,89 +0,0 @@ -package io.dockstore.metricsaggregator.helper; - -import static io.dockstore.common.metrics.FormatCheckHelper.isValidCurrencyCode; -import static io.dockstore.metricsaggregator.MoneyStatistics.CURRENCY; - -import io.dockstore.common.metrics.Cost; -import io.dockstore.common.metrics.RunExecution; -import io.dockstore.common.metrics.TaskExecutions; -import io.dockstore.metricsaggregator.MoneyStatistics; -import io.dockstore.openapi.client.model.CostMetric; -import java.util.List; -import java.util.Objects; -import java.util.Optional; -import org.javamoney.moneta.Money; - -public class CostAggregator extends RunExecutionAggregator { - - @Override - public Cost getMetricFromExecution(RunExecution execution) { - return execution.getCost(); - } - - @Override - public boolean validateExecutionMetric(Cost executionMetric) { - return executionMetric != null && isValidCurrencyCode(executionMetric.getCurrency()) && executionMetric.getValue() >= 0; - } - - @Override - public String getPropertyPathToValidate() { - return "cost"; - } - - @Override - public Optional getWorkflowExecutionFromTaskExecutions(TaskExecutions taskExecutionsForOneWorkflowRun) { - final List taskExecutions = taskExecutionsForOneWorkflowRun.getTaskExecutions(); - if (!taskExecutions.isEmpty() && taskExecutions.stream().map(RunExecution::getCost).allMatch(Objects::nonNull)) { - // Get the overall cost by summing up the cost of each task - List taskCosts = taskExecutions.stream() - .map(RunExecution::getCost) - .filter(Objects::nonNull) - .toList(); - if (!taskCosts.isEmpty()) { - Money totalCost = taskCosts.stream() - .map(cost -> Money.of(cost.getValue(), cost.getCurrency())) - .reduce(Money.of(0, CURRENCY), Money::add); - RunExecution workflowExecution = new RunExecution(); - workflowExecution.setCost(new Cost(totalCost.getNumber().doubleValue())); - return Optional.of(workflowExecution); - } - } - return Optional.empty(); - } - - @Override - protected Optional calculateAggregatedMetricFromExecutionMetrics(List executionMetrics) { - if (!executionMetrics.isEmpty()) { - List costs = executionMetrics.stream() - .map(cost -> Money.of(cost.getValue(), cost.getCurrency())) - .toList(); - MoneyStatistics statistics = new MoneyStatistics(costs); - return Optional.of(new CostMetric() - .minimum(statistics.getMinimum().getNumber().doubleValue()) - .maximum(statistics.getMaximum().getNumber().doubleValue()) - .average(statistics.getAverage().getNumber().doubleValue()) - .numberOfDataPointsForAverage(statistics.getNumberOfDataPoints())); - } - return Optional.empty(); - } - - @Override - protected Optional calculateAggregatedMetricFromAggregatedMetrics(List aggregatedMetrics) { - if (!aggregatedMetrics.isEmpty()) { - List statistics = aggregatedMetrics.stream() - .map(metric -> new MoneyStatistics( - Money.of(metric.getMinimum(), metric.getUnit()), - Money.of(metric.getMaximum(), metric.getUnit()), - Money.of(metric.getAverage(), metric.getUnit()), - metric.getNumberOfDataPointsForAverage())) - .toList(); - MoneyStatistics moneyStatistics = MoneyStatistics.createFromStatistics(statistics); - return Optional.of(new CostMetric() - .minimum(moneyStatistics.getMinimum().getNumber().doubleValue()) - .maximum(moneyStatistics.getMaximum().getNumber().doubleValue()) - .average(moneyStatistics.getAverage().getNumber().doubleValue()) - .numberOfDataPointsForAverage(moneyStatistics.getNumberOfDataPoints())); - } - return Optional.empty(); - } -} diff --git a/metricsaggregator/src/main/java/io/dockstore/metricsaggregator/helper/CpuAggregator.java b/metricsaggregator/src/main/java/io/dockstore/metricsaggregator/helper/CpuAggregator.java deleted file mode 100644 index 1db92a9f..00000000 --- a/metricsaggregator/src/main/java/io/dockstore/metricsaggregator/helper/CpuAggregator.java +++ /dev/null @@ -1,80 +0,0 @@ -package io.dockstore.metricsaggregator.helper; - -import io.dockstore.common.metrics.RunExecution; -import io.dockstore.common.metrics.TaskExecutions; -import io.dockstore.metricsaggregator.DoubleStatistics; -import io.dockstore.openapi.client.model.CpuMetric; -import java.util.List; -import java.util.Objects; -import java.util.Optional; - -/** - * Aggregate CPU metrics by calculating the minimum, maximum, and average. - * @return - */ -public class CpuAggregator extends RunExecutionAggregator { - - @Override - public Integer getMetricFromExecution(RunExecution execution) { - return execution.getCpuRequirements(); - } - - @Override - public boolean validateExecutionMetric(Integer executionMetric) { - return executionMetric != null && executionMetric >= 0; - } - - @Override - public String getPropertyPathToValidate() { - return "cpuRequirements"; - } - - @Override - public Optional getWorkflowExecutionFromTaskExecutions(TaskExecutions taskExecutionsForOneWorkflowRun) { - final List taskExecutions = taskExecutionsForOneWorkflowRun.getTaskExecutions(); - if (!taskExecutions.isEmpty() && taskExecutions.stream().map(RunExecution::getCpuRequirements).allMatch(Objects::nonNull)) { - // Get the overall CPU requirement by getting the maximum CPU value used - final Optional maxCpuRequirement = taskExecutions.stream() - .map(RunExecution::getCpuRequirements) - .filter(Objects::nonNull) - .max(Integer::compareTo); - if (maxCpuRequirement.isPresent()) { - RunExecution workflowExecution = new RunExecution(); - workflowExecution.setCpuRequirements(maxCpuRequirement.get()); - return Optional.ofNullable(workflowExecution); - } - } - return Optional.empty(); - } - - @Override - protected Optional calculateAggregatedMetricFromExecutionMetrics(List executionMetrics) { - List cpuRequirements = executionMetrics.stream() - .map(Integer::doubleValue) - .toList(); - if (!cpuRequirements.isEmpty()) { - DoubleStatistics statistics = new DoubleStatistics(cpuRequirements); - return Optional.of(new CpuMetric() - .minimum(statistics.getMinimum()) - .maximum(statistics.getMaximum()) - .average(statistics.getAverage()) - .numberOfDataPointsForAverage(statistics.getNumberOfDataPoints())); - } - return Optional.empty(); - } - - @Override - protected Optional calculateAggregatedMetricFromAggregatedMetrics(List aggregatedMetrics) { - if (!aggregatedMetrics.isEmpty()) { - List statistics = aggregatedMetrics.stream() - .map(metric -> new DoubleStatistics(metric.getMinimum(), metric.getMaximum(), metric.getAverage(), metric.getNumberOfDataPointsForAverage())).toList(); - DoubleStatistics newStatistic = DoubleStatistics.createFromStatistics(statistics); - return Optional.of(new CpuMetric() - .minimum(newStatistic.getMinimum()) - .maximum(newStatistic.getMaximum()) - .average(newStatistic.getAverage()) - .numberOfDataPointsForAverage(newStatistic.getNumberOfDataPoints())); - } - return Optional.empty(); - } -} diff --git a/metricsaggregator/src/main/java/io/dockstore/metricsaggregator/helper/ExecutionAggregator.java b/metricsaggregator/src/main/java/io/dockstore/metricsaggregator/helper/ExecutionAggregator.java deleted file mode 100644 index bb9f1f10..00000000 --- a/metricsaggregator/src/main/java/io/dockstore/metricsaggregator/helper/ExecutionAggregator.java +++ /dev/null @@ -1,109 +0,0 @@ -package io.dockstore.metricsaggregator.helper; - -import io.dockstore.common.metrics.Execution; -import io.dockstore.common.metrics.ExecutionsRequestBody; -import io.dockstore.openapi.client.model.Metric; -import jakarta.validation.ConstraintViolation; -import jakarta.validation.Validation; -import jakarta.validation.Validator; -import java.util.List; -import java.util.Objects; -import java.util.Optional; -import java.util.Set; - -/** - * A class defining the methods needed to aggregate workflow executions into aggregated metrics. - * @param The type of execution, example: RunExecution or ValidationExecution, that contains the metric to aggregate - * @param The aggregated metric from the Metrics class, a class containing multiple types of aggregated metrics - * @param The execution metric to aggregate from the Execution - */ -public abstract class ExecutionAggregator { - - private final Validator validator = Validation.buildDefaultValidatorFactory().getValidator(); - - /** - * Get the metric to aggregate from a single workflow execution. - * @param execution - * @return - */ - public abstract E getMetricFromExecution(T execution); - - /** - * Returns a boolean indicating if the execution metric is valid. - * @param executionMetric - * @return - */ - public abstract boolean validateExecutionMetric(E executionMetric); - - /** - * Aggregates workflow executions into an aggregated metric. - * @param executionMetrics - * @return - */ - protected abstract Optional calculateAggregatedMetricFromExecutionMetrics(List executionMetrics); - - /** - * Aggregates a list of aggregated metrics into one aggregated metric. - * @param aggregatedMetrics - * @return - */ - protected abstract Optional calculateAggregatedMetricFromAggregatedMetrics(List aggregatedMetrics); - - /** - * Aggregate metrics from all submissions in the ExecutionsRequestBody. - * @param allSubmissions - * @return - */ - public abstract Optional getAggregatedMetricFromAllSubmissions(ExecutionsRequestBody allSubmissions); - - /** - * Returns the property name of the metric in the Execution class to validate. - * This is used to determine if a Java Bean validation violation is for this particular execution metric. - * @return - */ - public abstract String getPropertyPathToValidate(); - - /** - * Aggregates workflow executions into an aggregated metric and calculates the number of skipped executions. - * @param executions - * @return - */ - - public final Optional getAggregatedMetricFromExecutions(List executions) { - final List executionsWithNonNullMetric = executions.stream().filter(execution -> getMetricFromExecution(execution) != null).toList(); - final List validExecutionMetrics = executionsWithNonNullMetric.stream().filter(this::isValid).map(this::getMetricFromExecution).toList(); - if (!validExecutionMetrics.isEmpty()) { - Optional calculatedMetric = calculateAggregatedMetricFromExecutionMetrics(validExecutionMetrics); - final int numberOfSkippedExecutions = executionsWithNonNullMetric.size() - validExecutionMetrics.size(); - calculatedMetric.ifPresent(metric -> metric.setNumberOfSkippedExecutions(numberOfSkippedExecutions)); - return calculatedMetric; - } - return Optional.empty(); - } - - /** - * Aggregates a list of aggregated metrics into one aggregated metric and calculates the number of skipped executions. - * @param aggregatedMetrics - * @return - */ - public final Optional getAggregatedMetricsFromAggregatedMetrics(List aggregatedMetrics) { - if (!aggregatedMetrics.isEmpty()) { - Optional calculatedMetric = calculateAggregatedMetricFromAggregatedMetrics(aggregatedMetrics.stream().filter(Objects::nonNull).toList()); - // Sum number of skipped executions from the aggregated metrics - final int numberOfSkippedExecutions = aggregatedMetrics.stream().map(Metric::getNumberOfSkippedExecutions).reduce(0, Integer::sum); - calculatedMetric.ifPresent(metric -> metric.setNumberOfSkippedExecutions(numberOfSkippedExecutions)); - return calculatedMetric; - } - return Optional.empty(); - } - - /** - * Validate executions using Java Bean Validation - * @param execution - * @return - */ - public boolean isValid(T execution) { - Set> violations = validator.validate(execution); - return violations.stream().noneMatch(violation -> violation.getPropertyPath().toString().startsWith(getPropertyPathToValidate())); - } -} diff --git a/metricsaggregator/src/main/java/io/dockstore/metricsaggregator/helper/ExecutionStatusAggregator.java b/metricsaggregator/src/main/java/io/dockstore/metricsaggregator/helper/ExecutionStatusAggregator.java deleted file mode 100644 index 8f3e3cac..00000000 --- a/metricsaggregator/src/main/java/io/dockstore/metricsaggregator/helper/ExecutionStatusAggregator.java +++ /dev/null @@ -1,164 +0,0 @@ -package io.dockstore.metricsaggregator.helper; - -import static java.util.stream.Collectors.groupingBy; - -import io.dockstore.common.metrics.ExecutionStatus; -import io.dockstore.common.metrics.RunExecution; -import io.dockstore.common.metrics.TaskExecutions; -import io.dockstore.openapi.client.model.ExecutionStatusMetric; -import io.dockstore.openapi.client.model.MetricsByStatus; -import java.util.Collection; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; -import java.util.Objects; -import java.util.Optional; -import java.util.function.Function; -import java.util.stream.Collectors; - -/** - * Aggregate Execution Status metrics by summing up the count of each Execution Status. - */ -public class ExecutionStatusAggregator extends RunExecutionAggregator { - // Aggregators used to calculate metrics by execution status - private final ExecutionTimeAggregator executionTimeAggregator = new ExecutionTimeAggregator(); - private final CpuAggregator cpuAggregator = new CpuAggregator(); - private final MemoryAggregator memoryAggregator = new MemoryAggregator(); - private final CostAggregator costAggregator = new CostAggregator(); - - @Override - public boolean validateExecutionMetric(RunExecution executionMetric) { - return executionMetric.getExecutionStatus() != null; - } - - @Override - public String getPropertyPathToValidate() { - return "executionStatus"; - } - - @Override - public RunExecution getMetricFromExecution(RunExecution execution) { - return execution; // Uses fields from the entire execution - } - - @Override - public Optional getWorkflowExecutionFromTaskExecutions(TaskExecutions taskExecutionsForOneWorkflowRun) { - final List taskExecutions = taskExecutionsForOneWorkflowRun.getTaskExecutions(); - RunExecution workflowExecution = new RunExecution(); - if (!taskExecutions.isEmpty() && taskExecutions.stream().map(RunExecution::getExecutionStatus).allMatch(Objects::nonNull)) { - if (taskExecutions.stream().allMatch(taskRunExecution -> taskRunExecution.getExecutionStatus() == ExecutionStatus.SUCCESSFUL)) { - // All executions were successful - workflowExecution.setExecutionStatus(ExecutionStatus.SUCCESSFUL); - } else { - // If there were failed executions, set the overall status to the most frequent failed status - Optional mostFrequentFailedStatus = taskExecutions.stream() - .map(RunExecution::getExecutionStatus) - .filter(taskExecutionStatus -> taskExecutionStatus != ExecutionStatus.SUCCESSFUL) - .collect(groupingBy(Function.identity(), Collectors.reducing(0, e -> 1, Integer::sum))) - .entrySet() - .stream() - .max(Entry.comparingByValue()) - .map(Entry::getKey); - mostFrequentFailedStatus.ifPresent(workflowExecution::setExecutionStatus); - } - - if (workflowExecution.getExecutionStatus() != null) { - executionTimeAggregator.getWorkflowExecutionFromTaskExecutions(taskExecutionsForOneWorkflowRun).ifPresent(executionWithTime -> workflowExecution.setExecutionTime(executionWithTime.getExecutionTime())); - cpuAggregator.getWorkflowExecutionFromTaskExecutions(taskExecutionsForOneWorkflowRun).ifPresent(executionWithCpu -> workflowExecution.setCpuRequirements(executionWithCpu.getCpuRequirements())); - memoryAggregator.getWorkflowExecutionFromTaskExecutions(taskExecutionsForOneWorkflowRun).ifPresent(executionWithMemory -> workflowExecution.setMemoryRequirementsGB(executionWithMemory.getMemoryRequirementsGB())); - costAggregator.getWorkflowExecutionFromTaskExecutions(taskExecutionsForOneWorkflowRun).ifPresent(executionWithCost -> workflowExecution.setCost(executionWithCost.getCost())); - return Optional.of(workflowExecution); - } - } - return Optional.empty(); - } - - @Override - protected Optional calculateAggregatedMetricFromExecutionMetrics(List executionMetrics) { - if (!executionMetrics.isEmpty()) { - Map> executionsByStatus = executionMetrics.stream() - .collect(groupingBy(RunExecution::getExecutionStatus)); - - ExecutionStatusMetric executionStatusMetric = new ExecutionStatusMetric(); - executionsByStatus.forEach((status, executionsForStatus) -> { - MetricsByStatus metricsByStatus = getMetricsByStatusFromExecutions(executionsForStatus); - executionStatusMetric.getCount().put(status.toString(), metricsByStatus); - }); - - // Figure out metrics over all statuses - MetricsByStatus overallMetricsByStatus = getMetricsByStatusFromExecutions(executionsByStatus.values().stream().flatMap(Collection::stream).toList()); - executionStatusMetric.getCount().put(ExecutionStatus.ALL.name(), overallMetricsByStatus); - - return Optional.of(executionStatusMetric); - } - return Optional.empty(); - } - - @Override - protected Optional calculateAggregatedMetricFromAggregatedMetrics(List aggregatedMetrics) { - if (!aggregatedMetrics.isEmpty()) { - Map> statusToMetricsByStatus = aggregatedMetrics.stream() - .filter(Objects::nonNull) - .map(executionStatusMetric -> executionStatusMetric.getCount().entrySet()) - .flatMap(Collection::stream) - .collect(groupingBy(Map.Entry::getKey, Collectors.mapping(Entry::getValue, Collectors.toList()))); - - if (statusToMetricsByStatus.isEmpty()) { - return Optional.empty(); - } - - ExecutionStatusMetric executionStatusMetric = new ExecutionStatusMetric(); - statusToMetricsByStatus.forEach((status, metricsForStatus) -> { - MetricsByStatus metricsByStatus = getMetricsByStatusFromMetricsByStatusList(metricsForStatus); - executionStatusMetric.getCount().put(status, metricsByStatus); - }); - - // Calculate metrics over all statuses - // Calculate from previous ALL MetricsByStatus (aggregate using aggregated data) - List metricsByStatusesToCalculateAllStatus = statusToMetricsByStatus.get(ExecutionStatus.ALL.name()); - if (metricsByStatusesToCalculateAllStatus == null) { - // If there's no ALL key, calculate from other statuses - metricsByStatusesToCalculateAllStatus = statusToMetricsByStatus.values().stream().flatMap(Collection::stream).toList(); - } - MetricsByStatus overallMetricsByStatus = getMetricsByStatusFromMetricsByStatusList(metricsByStatusesToCalculateAllStatus); - executionStatusMetric.getCount().put(ExecutionStatus.ALL.name(), overallMetricsByStatus); - return Optional.of(executionStatusMetric); - } - return Optional.empty(); - } - - /** - * Aggregate executions into a MetricsByStatus object. Assumes that all executions have the status - * @param executions - * @return - */ - private MetricsByStatus getMetricsByStatusFromExecutions(List executions) { - MetricsByStatus metricsByStatus = new MetricsByStatus() - .executionStatusCount(executions.size()); - - // Figure out metrics by status - executionTimeAggregator.getAggregatedMetricFromExecutions(executions).ifPresent(metricsByStatus::setExecutionTime); - cpuAggregator.getAggregatedMetricFromExecutions(executions).ifPresent(metricsByStatus::setCpu); - memoryAggregator.getAggregatedMetricFromExecutions(executions).ifPresent(metricsByStatus::setMemory); - costAggregator.getAggregatedMetricFromExecutions(executions).ifPresent(metricsByStatus::setCost); - return metricsByStatus; - } - - /** - * Aggregate a list of MetricsByStatus objects into a MetricsByStatus object. Assumes that all executions have the same status - * @param metricsByStatuses - * @return - */ - private MetricsByStatus getMetricsByStatusFromMetricsByStatusList(List metricsByStatuses) { - final int totalCountForStatus = metricsByStatuses.stream().map(MetricsByStatus::getExecutionStatusCount).reduce(0, Integer::sum); - MetricsByStatus metricsByStatus = new MetricsByStatus() - .executionStatusCount(totalCountForStatus); - - // Figure out metrics by status - executionTimeAggregator.getAggregatedMetricsFromAggregatedMetrics(metricsByStatuses.stream().map(MetricsByStatus::getExecutionTime).filter(Objects::nonNull).toList()).ifPresent(metricsByStatus::setExecutionTime); - cpuAggregator.getAggregatedMetricsFromAggregatedMetrics(metricsByStatuses.stream().map(MetricsByStatus::getCpu).filter(Objects::nonNull).toList()).ifPresent(metricsByStatus::setCpu); - memoryAggregator.getAggregatedMetricsFromAggregatedMetrics(metricsByStatuses.stream().map(MetricsByStatus::getMemory).filter(Objects::nonNull).toList()).ifPresent(metricsByStatus::setMemory); - costAggregator.getAggregatedMetricsFromAggregatedMetrics(metricsByStatuses.stream().map(MetricsByStatus::getCost).filter(Objects::nonNull).toList()).ifPresent(metricsByStatus::setCost); - return metricsByStatus; - } -} diff --git a/metricsaggregator/src/main/java/io/dockstore/metricsaggregator/helper/ExecutionTimeAggregator.java b/metricsaggregator/src/main/java/io/dockstore/metricsaggregator/helper/ExecutionTimeAggregator.java deleted file mode 100644 index 8caf725b..00000000 --- a/metricsaggregator/src/main/java/io/dockstore/metricsaggregator/helper/ExecutionTimeAggregator.java +++ /dev/null @@ -1,115 +0,0 @@ -package io.dockstore.metricsaggregator.helper; - -import static io.dockstore.common.metrics.FormatCheckHelper.checkExecutionDateISO8601Format; -import static io.dockstore.common.metrics.FormatCheckHelper.checkExecutionTimeISO8601Format; - -import io.dockstore.common.metrics.RunExecution; -import io.dockstore.common.metrics.TaskExecutions; -import io.dockstore.metricsaggregator.DoubleStatistics; -import io.dockstore.openapi.client.model.ExecutionTimeMetric; -import java.time.Duration; -import java.time.temporal.ChronoUnit; -import java.util.Comparator; -import java.util.Date; -import java.util.List; -import java.util.Objects; -import java.util.Optional; - -/** - * Aggregate Execution Time metrics by calculating the minimum, maximum, and average. - */ -public final class ExecutionTimeAggregator extends RunExecutionAggregator { - @Override - public String getMetricFromExecution(RunExecution execution) { - return execution.getExecutionTime(); - } - - @Override - public boolean validateExecutionMetric(String executionMetric) { - return checkExecutionTimeISO8601Format(executionMetric).isPresent(); - } - - @Override - public String getPropertyPathToValidate() { - return "executionTime"; - } - - @Override - public Optional getWorkflowExecutionFromTaskExecutions(TaskExecutions taskExecutionsForOneWorkflowRun) { - final List taskExecutions = taskExecutionsForOneWorkflowRun.getTaskExecutions(); - if (!taskExecutions.isEmpty() && taskExecutions.stream().map(RunExecution::getExecutionTime).allMatch(Objects::nonNull)) { - RunExecution workflowExecution = new RunExecution(); - // We cannot calculate the overall total time from RunExecution's executionTime, which is in ISO 8601 duration format. - // Calculate a best guess using RunExecution's dateExecuted, which is in ISO 8601 date format - if (taskExecutions.size() == 1 && taskExecutions.get(0).getExecutionTime() != null) { - // If there's only one task, set the workflow-level execution time to be the execution time of the single task - workflowExecution.setExecutionTime(taskExecutions.get(0).getExecutionTime()); - return Optional.of(workflowExecution); - - // Calculate a duration if all task executions have a valid dateExecuted - } else if (taskExecutions.stream().allMatch(execution -> checkExecutionDateISO8601Format(execution.getDateExecuted()).isPresent())) { - // Find the earliest date executed and latest date executed to calculate a duration estimate - final Optional earliestTaskExecutionDate = taskExecutions.stream() - .map(execution -> checkExecutionDateISO8601Format(execution.getDateExecuted()).get()) - .min(Date::compareTo); - final Optional latestTaskExecutionDate = taskExecutions.stream() - .map(execution -> checkExecutionDateISO8601Format(execution.getDateExecuted()).get()) - .max(Date::compareTo); - final Optional latestTaskExecuted = taskExecutions.stream() - .max(Comparator.comparing(execution -> checkExecutionDateISO8601Format(execution.getDateExecuted()).get(), Date::compareTo)); - - if (earliestTaskExecutionDate.isPresent() && latestTaskExecutionDate.isPresent() && latestTaskExecuted.isPresent()) { - // Execution dates are the start dates, calculate a rough duration from the execution dates of the earliest and latest tasks - long durationInMs = latestTaskExecutionDate.get().getTime() - earliestTaskExecutionDate.get().getTime(); - Duration duration = Duration.of(durationInMs, ChronoUnit.MILLIS); - // If the execution time of the latest task is present, add that to the duration to account for the amount of time the last task took to execute - Optional latestTaskExecutionTime = checkExecutionTimeISO8601Format(latestTaskExecuted.get().getExecutionTime()); - if (latestTaskExecutionTime.isPresent()) { - duration = duration.plus(latestTaskExecutionTime.get()); - } - workflowExecution.setExecutionTime(duration.toString()); - return Optional.of(workflowExecution); - } - } - } - return Optional.empty(); - } - - @Override - protected Optional calculateAggregatedMetricFromExecutionMetrics(List executionMetrics) { - List executionTimesInSeconds = executionMetrics.stream() - .map(executionTime -> { - // Convert executionTime in ISO 8601 duration format to seconds - Duration parsedISO8601ExecutionTime = checkExecutionTimeISO8601Format(executionTime).get(); - return (double) parsedISO8601ExecutionTime.toSeconds(); - }) - .toList(); - - if (!executionTimesInSeconds.isEmpty()) { - DoubleStatistics statistics = new DoubleStatistics(executionTimesInSeconds); - return Optional.of(new ExecutionTimeMetric() - .minimum(statistics.getMinimum()) - .maximum(statistics.getMaximum()) - .average(statistics.getAverage()) - .numberOfDataPointsForAverage(statistics.getNumberOfDataPoints())); - } - return Optional.empty(); - } - - @Override - protected Optional calculateAggregatedMetricFromAggregatedMetrics(List aggregatedMetrics) { - if (!aggregatedMetrics.isEmpty()) { - List statistics = aggregatedMetrics.stream() - .map(metric -> new DoubleStatistics(metric.getMinimum(), metric.getMaximum(), metric.getAverage(), metric.getNumberOfDataPointsForAverage())) - .toList(); - - DoubleStatistics newStatistic = DoubleStatistics.createFromStatistics(statistics); - return Optional.of(new ExecutionTimeMetric() - .minimum(newStatistic.getMinimum()) - .maximum(newStatistic.getMaximum()) - .average(newStatistic.getAverage()) - .numberOfDataPointsForAverage(newStatistic.getNumberOfDataPoints())); - } - return Optional.empty(); - } -} diff --git a/metricsaggregator/src/main/java/io/dockstore/metricsaggregator/helper/MemoryAggregator.java b/metricsaggregator/src/main/java/io/dockstore/metricsaggregator/helper/MemoryAggregator.java deleted file mode 100644 index e675de44..00000000 --- a/metricsaggregator/src/main/java/io/dockstore/metricsaggregator/helper/MemoryAggregator.java +++ /dev/null @@ -1,76 +0,0 @@ -package io.dockstore.metricsaggregator.helper; - -import io.dockstore.common.metrics.RunExecution; -import io.dockstore.common.metrics.TaskExecutions; -import io.dockstore.metricsaggregator.DoubleStatistics; -import io.dockstore.openapi.client.model.MemoryMetric; -import java.util.List; -import java.util.Objects; -import java.util.Optional; - -/** - * Aggregate Memory metrics by calculating the minimum, maximum, and average. - */ -public class MemoryAggregator extends RunExecutionAggregator { - - @Override - public Double getMetricFromExecution(RunExecution execution) { - return execution.getMemoryRequirementsGB(); - } - - @Override - public boolean validateExecutionMetric(Double executionMetric) { - return executionMetric != null && executionMetric >= 0; - } - - @Override - public String getPropertyPathToValidate() { - return "memoryRequirementsGB"; - } - - @Override - public Optional getWorkflowExecutionFromTaskExecutions(TaskExecutions taskExecutionsForOneWorkflowRun) { - final List taskExecutions = taskExecutionsForOneWorkflowRun.getTaskExecutions(); - if (!taskExecutions.isEmpty() && taskExecutions.stream().map(RunExecution::getMemoryRequirementsGB).allMatch(Objects::nonNull)) { - // Get the overall memory requirement by getting the maximum memory value used - final Optional maxMemoryRequirement = taskExecutions.stream() - .map(RunExecution::getMemoryRequirementsGB) - .filter(Objects::nonNull) - .max(Double::compareTo); - if (maxMemoryRequirement.isPresent()) { - RunExecution workflowExecution = new RunExecution(); - workflowExecution.setMemoryRequirementsGB(maxMemoryRequirement.get()); - return Optional.of(workflowExecution); - } - } - return Optional.empty(); - } - - @Override - protected Optional calculateAggregatedMetricFromExecutionMetrics(List executionMetrics) { - if (!executionMetrics.isEmpty()) { - DoubleStatistics statistics = new DoubleStatistics(executionMetrics); - return Optional.of(new MemoryMetric() - .minimum(statistics.getMinimum()) - .maximum(statistics.getMaximum()) - .average(statistics.getAverage()) - .numberOfDataPointsForAverage(statistics.getNumberOfDataPoints())); - } - return Optional.empty(); - } - - @Override - protected Optional calculateAggregatedMetricFromAggregatedMetrics(List aggregatedMetrics) { - if (!aggregatedMetrics.isEmpty()) { - List statistics = aggregatedMetrics.stream() - .map(metric -> new DoubleStatistics(metric.getMinimum(), metric.getMaximum(), metric.getAverage(), metric.getNumberOfDataPointsForAverage())).toList(); - DoubleStatistics newStatistic = DoubleStatistics.createFromStatistics(statistics); - return Optional.of(new MemoryMetric() - .minimum(newStatistic.getMinimum()) - .maximum(newStatistic.getMaximum()) - .average(newStatistic.getAverage()) - .numberOfDataPointsForAverage(newStatistic.getNumberOfDataPoints())); - } - return Optional.empty(); - } -} diff --git a/metricsaggregator/src/main/java/io/dockstore/metricsaggregator/helper/RunExecutionAggregator.java b/metricsaggregator/src/main/java/io/dockstore/metricsaggregator/helper/RunExecutionAggregator.java deleted file mode 100644 index 65d95e6d..00000000 --- a/metricsaggregator/src/main/java/io/dockstore/metricsaggregator/helper/RunExecutionAggregator.java +++ /dev/null @@ -1,62 +0,0 @@ -package io.dockstore.metricsaggregator.helper; - -import io.dockstore.common.metrics.ExecutionsRequestBody; -import io.dockstore.common.metrics.RunExecution; -import io.dockstore.common.metrics.TaskExecutions; -import io.dockstore.openapi.client.model.Metric; -import java.util.ArrayList; -import java.util.List; -import java.util.Objects; -import java.util.Optional; - -/** - * A class defining the methods needed to aggregate workflow RunExecutions into aggregated metrics. - * @param The aggregated metric from the Metrics class, a class containing multiple types of aggregated metrics - * @param The execution metric to aggregate from the Execution - */ -public abstract class RunExecutionAggregator extends ExecutionAggregator { - - public List getTaskExecutionsWithMetric(List taskExecutionsList) { - return taskExecutionsList.stream().filter(taskExecutions -> taskExecutions.getTaskExecutions().stream().map( - this::getMetricFromExecution).allMatch(Objects::nonNull)).toList(); - } - - /** - * Aggregates TaskExecutions that belong to a single workflow run into a workflow-level RunExecution. - * Does NOT check that the resulting workflow run is valid. The validity check is done when workflow executions are aggregated so that the aggregator can recognize that task metrics were skipped. - * @param taskExecutionsForOneWorkflowRun - * @return - */ - public abstract Optional getWorkflowExecutionFromTaskExecutions(TaskExecutions taskExecutionsForOneWorkflowRun); - - /** - * Aggregate metrics from all submissions in the ExecutionsRequestBody. - * This method uses the runExecutions, and taskExecutions from ExecutionRequestBody to create an aggregated metric. - * Metrics are aggregated by: - *
    - *
  1. Aggregating task executions, provided via ExecutionRequestBody.taskExecutions, into workflow executions.
  2. - *
  3. Aggregating workflow executions,submitted via ExecutionRequestBody.runExecutions and workflow executions that were aggregated from task executions, into an aggregated metric. - *
- * @param allSubmissions - * @return - */ - @Override - public Optional getAggregatedMetricFromAllSubmissions(ExecutionsRequestBody allSubmissions) { - final List workflowExecutions = new ArrayList<>(allSubmissions.getRunExecutions()); - - // If task executions are present, calculate the workflow RunExecution containing the overall workflow-level execution time for each list of tasks - if (!allSubmissions.getTaskExecutions().isEmpty()) { - final List taskExecutionsWithMetric = getTaskExecutionsWithMetric(allSubmissions.getTaskExecutions()); - final List calculatedWorkflowExecutionsFromTasks = taskExecutionsWithMetric.stream() - .map(this::getWorkflowExecutionFromTaskExecutions) - .filter(Optional::isPresent) - .map(Optional::get) - .toList(); - workflowExecutions.addAll(calculatedWorkflowExecutionsFromTasks); - } - - - // Aggregate workflow executions into one metric - return getAggregatedMetricFromExecutions(workflowExecutions); - } -} diff --git a/metricsaggregator/src/main/java/io/dockstore/metricsaggregator/helper/RunExecutionAthenaAggregator.java b/metricsaggregator/src/main/java/io/dockstore/metricsaggregator/helper/RunExecutionAthenaAggregator.java index fbb08c8d..094022d1 100644 --- a/metricsaggregator/src/main/java/io/dockstore/metricsaggregator/helper/RunExecutionAthenaAggregator.java +++ b/metricsaggregator/src/main/java/io/dockstore/metricsaggregator/helper/RunExecutionAthenaAggregator.java @@ -86,6 +86,7 @@ public void addGroupFields(Set> newGroupFields) { /** * Create the runexecutions query string using the SELECT and GROUP BY fields. + * De-duplicates executions with the same execution ID by taking the newest execution according to the S3 file modified time. * * @return */ diff --git a/metricsaggregator/src/main/java/io/dockstore/metricsaggregator/helper/ValidationExecutionAggregator.java b/metricsaggregator/src/main/java/io/dockstore/metricsaggregator/helper/ValidationExecutionAggregator.java deleted file mode 100644 index 5a68e132..00000000 --- a/metricsaggregator/src/main/java/io/dockstore/metricsaggregator/helper/ValidationExecutionAggregator.java +++ /dev/null @@ -1,33 +0,0 @@ -package io.dockstore.metricsaggregator.helper; - -import io.dockstore.common.metrics.ExecutionsRequestBody; -import io.dockstore.common.metrics.ValidationExecution; -import io.dockstore.openapi.client.model.Metric; -import java.util.ArrayList; -import java.util.List; -import java.util.Optional; - -/** - * A class defining the methods needed to aggregate workflow ValidationExecutions into aggregated metrics. - * @param The aggregated metric from the Metrics class, a class containing multiple types of aggregated metrics - * @param The execution metric to aggregate from the Execution - */ -public abstract class ValidationExecutionAggregator extends ExecutionAggregator { - - /** - * Aggregate metrics from all submissions in the ExecutionsRequestBody. - * This method uses the validationExecutions from ExecutionRequestBody to create an aggregated metric. - * Metrics are aggregated by: - *
    - *
  1. Aggregating workflow executions,submitted via ExecutionRequestBody.validationExecutions into an aggregated metric. - *
- * @param allSubmissions - * @return - */ - public Optional getAggregatedMetricFromAllSubmissions(ExecutionsRequestBody allSubmissions) { - final List workflowExecutions = new ArrayList<>(allSubmissions.getValidationExecutions()); - - // Aggregate workflow executions into one metric - return getAggregatedMetricFromExecutions(workflowExecutions); - } -} diff --git a/metricsaggregator/src/main/java/io/dockstore/metricsaggregator/helper/ValidationStatusAggregator.java b/metricsaggregator/src/main/java/io/dockstore/metricsaggregator/helper/ValidationStatusAggregator.java deleted file mode 100644 index 2f2d33db..00000000 --- a/metricsaggregator/src/main/java/io/dockstore/metricsaggregator/helper/ValidationStatusAggregator.java +++ /dev/null @@ -1,201 +0,0 @@ -package io.dockstore.metricsaggregator.helper; - -import static io.dockstore.common.metrics.FormatCheckHelper.checkExecutionDateISO8601Format; -import static java.util.stream.Collectors.groupingBy; - -import io.dockstore.common.metrics.ValidationExecution; -import io.dockstore.common.metrics.ValidationExecution.ValidatorTool; -import io.dockstore.metricsaggregator.DoubleStatistics; -import io.dockstore.openapi.client.model.ValidationStatusMetric; -import io.dockstore.openapi.client.model.ValidatorInfo; -import io.dockstore.openapi.client.model.ValidatorVersionInfo; -import java.util.Comparator; -import java.util.Date; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Optional; -import java.util.stream.Collectors; -import org.apache.commons.lang3.StringUtils; - -/** - * Aggregate Validation metrics from the list of validation executions by retrieving the validation information for the most recent execution of - * each validator tool version. - */ -public class ValidationStatusAggregator extends ValidationExecutionAggregator { - - @Override - public ValidationExecution getMetricFromExecution(ValidationExecution execution) { - return execution; // The entire execution contains the metric, not a specific field like with RunExecution - } - - @Override - public boolean validateExecutionMetric(ValidationExecution executionMetric) { - return true; - } - - @Override - public String getPropertyPathToValidate() { - return ""; - } - - /** - * Aggregate Validation metrics from the list of validation executions by retrieving the validation information for the most recent execution of - * each validator tool version. - * @param executionMetrics - * @return - */ - @Override - protected Optional calculateAggregatedMetricFromExecutionMetrics(List executionMetrics) { - if (executionMetrics.isEmpty()) { - return Optional.empty(); - } - - // Group executions by validator tool - Map> validatorToolToValidations = executionMetrics.stream() - .collect(groupingBy(ValidationExecution::getValidatorTool)); - - // For each validator tool, aggregate validation metrics for it - Map validatorToolToValidationInfo = new HashMap<>(); - validatorToolToValidations.forEach((validatorTool, validatorToolExecutions) -> { - Optional latestValidationExecution = getLatestValidationExecution(validatorToolExecutions); - - if (latestValidationExecution.isPresent()) { - // Group the validation executions for the validator tool by version - Map> validatorVersionNameToValidationExecutions = validatorToolExecutions.stream() - .collect(groupingBy(ValidationExecution::getValidatorToolVersion)); - - // Get the validation information for the most recent execution for each validator tool version - Map validatorVersionNameToVersionInfo = new HashMap<>(); - validatorVersionNameToValidationExecutions.forEach((validatorVersionName, validatorVersionExecutions) -> { - Optional latestValidationExecutionForVersion = getLatestValidationExecution(validatorVersionExecutions); - - latestValidationExecutionForVersion.ifPresent(validationExecution -> { - ValidatorVersionInfo validatorVersionInfo = new ValidatorVersionInfo() - .name(validatorVersionName) - .isValid(validationExecution.getIsValid()) - .dateExecuted(validationExecution.getDateExecuted()) - .numberOfRuns(validatorVersionExecutions.size()) - .passingRate(getPassingRate(validatorVersionExecutions)); - - if (!validationExecution.getIsValid() && StringUtils.isNotBlank(validationExecution.getErrorMessage())) { - validatorVersionInfo.errorMessage(validationExecution.getErrorMessage()); - } - - validatorVersionNameToVersionInfo.put(validatorVersionName, validatorVersionInfo); - }); - }); - - // Set validation info for the validator tool - ValidatorInfo validatorInfo = new ValidatorInfo() - .mostRecentVersionName(validatorVersionNameToVersionInfo.get(latestValidationExecution.get().getValidatorToolVersion()).getName()) - .validatorVersions(validatorVersionNameToVersionInfo.values().stream().toList()) - .numberOfRuns(validatorToolExecutions.size()) - .passingRate(getPassingRate(validatorToolExecutions)); - - validatorToolToValidationInfo.put(validatorTool.toString(), validatorInfo); - } - }); - - // This shouldn't happen because all validation executions should the required fields, but check anyway - if (validatorToolToValidationInfo.isEmpty()) { - return Optional.empty(); - } - return Optional.of(new ValidationStatusMetric().validatorTools(validatorToolToValidationInfo)); - } - - @Override - protected Optional calculateAggregatedMetricFromAggregatedMetrics( - List aggregatedMetrics) { - Map newValidatorToolToValidatorInfo = new HashMap<>(); - if (!aggregatedMetrics.isEmpty()) { - // Go through all the ValidationStatusMetrics and group the ValidationVersionInfos by validator tool - Map> validatorToolToValidationVersionInfos = aggregatedMetrics.stream() - .map(ValidationStatusMetric::getValidatorTools) - .flatMap(validatorToolToValidatorInfoMap -> validatorToolToValidatorInfoMap.entrySet().stream()) - .collect(groupingBy(Map.Entry::getKey, Collectors.flatMapping(entry -> { - ValidatorInfo validationInfoForValidatorTool = entry.getValue(); - return validationInfoForValidatorTool.getValidatorVersions().stream(); - }, Collectors.toList()))); - - // For each validator tool, find the most recent ValidatorVersionInfo for each version - validatorToolToValidationVersionInfos.forEach((validatorTool, validationVersionInfosByValidatorTool) -> { - // Number of runs across all versions - final int numberOfRuns = validationVersionInfosByValidatorTool.stream().map(ValidatorVersionInfo::getNumberOfRuns) - .mapToInt(Integer::intValue) - .sum(); - final List validationRunsStatistics = validationVersionInfosByValidatorTool.stream() - .map(validatorVersionInfo -> new DoubleStatistics(validatorVersionInfo.getPassingRate(), validatorVersionInfo.getNumberOfRuns())) - .toList(); - - final double passingRate = DoubleStatistics.createFromStatistics(validationRunsStatistics).getAverage(); - final Optional mostRecentValidationVersion = getLatestValidationVersionInfo(validationVersionInfosByValidatorTool); - - if (mostRecentValidationVersion.isPresent()) { - // Group ValidatorVersionInfo by version name - Map> versionNameToValidationVersionInfos = validationVersionInfosByValidatorTool.stream() - .collect(Collectors.groupingBy(ValidatorVersionInfo::getName)); - - // Get a list of the most recent ValidatorVersionInfo for each version - List mostRecentValidationVersionInfos = versionNameToValidationVersionInfos.values().stream().map(this::getLatestValidationVersionInfo).filter(Optional::isPresent).map(Optional::get).toList(); - - // Set validation info for the validator tool - ValidatorInfo validatorInfo = new ValidatorInfo() - .mostRecentVersionName(mostRecentValidationVersion.get().getName()) - .validatorVersions(mostRecentValidationVersionInfos) - .numberOfRuns(numberOfRuns) - .passingRate(passingRate); - - newValidatorToolToValidatorInfo.put(validatorTool, validatorInfo); - } - }); - } - - if (newValidatorToolToValidatorInfo.isEmpty()) { - return Optional.empty(); - } - return Optional.of(new ValidationStatusMetric().validatorTools(newValidatorToolToValidatorInfo)); - } - - static Optional getLatestValidationExecution(List executions) { - if (executions.isEmpty()) { - return Optional.empty(); - } - - boolean containsInvalidDate = executions.stream().anyMatch(execution -> checkExecutionDateISO8601Format(execution.getDateExecuted()).isEmpty()); - if (containsInvalidDate) { - return Optional.empty(); - } - - return executions.stream() - .max(Comparator.comparing(execution -> checkExecutionDateISO8601Format(execution.getDateExecuted()).get(), Date::compareTo)); - } - - Optional getLatestValidationVersionInfo(List validationVersionInfos) { - if (validationVersionInfos.isEmpty()) { - return Optional.empty(); - } - - boolean containsInvalidDate = validationVersionInfos.stream().anyMatch(execution -> checkExecutionDateISO8601Format(execution.getDateExecuted()).isEmpty()); - if (containsInvalidDate) { - return Optional.empty(); - } - - return validationVersionInfos.stream() - .max(Comparator.comparing(validatorVersionInfo -> checkExecutionDateISO8601Format(validatorVersionInfo.getDateExecuted()).get(), Date::compareTo)); - } - - /** - * Gets the percentage of executions that passed validation - * @param executions - * @return - */ - static double getPassingRate(List executions) { - final int oneHundredPercent = 100; - final double numberOfPassingExecutions = executions.stream() - .filter(ValidationExecution::getIsValid) - .count(); - - return (numberOfPassingExecutions / executions.size()) * oneHundredPercent; - } -} diff --git a/metricsaggregator/src/test/java/io/dockstore/metricsaggregator/DoubleStatisticsTest.java b/metricsaggregator/src/test/java/io/dockstore/metricsaggregator/DoubleStatisticsTest.java deleted file mode 100644 index f3c6c8d3..00000000 --- a/metricsaggregator/src/test/java/io/dockstore/metricsaggregator/DoubleStatisticsTest.java +++ /dev/null @@ -1,36 +0,0 @@ -/* - * Copyright 2023 OICR and UCSC - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package io.dockstore.metricsaggregator; - -import static org.junit.jupiter.api.Assertions.assertEquals; - -import java.util.List; -import org.junit.jupiter.api.Test; - -class DoubleStatisticsTest { - - @Test - void testStatistic() { - List fibonacci = List.of(0.0, 1.0, 1.0, 2.0, 3.0, 5.0, 8.0, 13.0, 21.0, 34.0, 55.0, 89.0, 144.0); - DoubleStatistics statistics = new DoubleStatistics(fibonacci); - assertEquals(0, statistics.getMinimum()); - assertEquals(144, statistics.getMaximum()); - assertEquals(29, Math.round(statistics.getAverage())); - assertEquals(13, statistics.getNumberOfDataPoints()); - } -} diff --git a/metricsaggregator/src/test/java/io/dockstore/metricsaggregator/MoneyStatisticsTest.java b/metricsaggregator/src/test/java/io/dockstore/metricsaggregator/MoneyStatisticsTest.java deleted file mode 100644 index 1e3f7138..00000000 --- a/metricsaggregator/src/test/java/io/dockstore/metricsaggregator/MoneyStatisticsTest.java +++ /dev/null @@ -1,28 +0,0 @@ -package io.dockstore.metricsaggregator; - -import static org.junit.jupiter.api.Assertions.assertEquals; - -import java.math.RoundingMode; -import java.util.List; -import java.util.stream.Stream; -import javax.money.Monetary; -import javax.money.MonetaryRounding; -import javax.money.RoundingQueryBuilder; -import org.javamoney.moneta.Money; -import org.junit.jupiter.api.Test; - -class MoneyStatisticsTest { - @Test - void testStatistic() { - List fibonacci = Stream.of(0.0, 1.0, 1.0, 2.0, 3.0, 5.0, 8.0, 13.0, 21.0, 34.0, 55.0, 89.0, 144.0) - .map(value -> Money.of(value, "USD")) - .toList(); - MoneyStatistics statistics = new MoneyStatistics(fibonacci); - assertEquals(Money.of(0, "USD"), statistics.getMinimum()); - assertEquals(Money.of(144, "USD"), statistics.getMaximum()); - MonetaryRounding rounding = Monetary.getRounding( - RoundingQueryBuilder.of().setScale(2).set(RoundingMode.HALF_UP).build()); - assertEquals(Money.of(28.92, "USD"), statistics.getAverage().with(rounding)); - assertEquals(13, statistics.getNumberOfDataPoints()); - } -} diff --git a/metricsaggregator/src/test/java/io/dockstore/metricsaggregator/client/cli/MetricsAggregatorClientIT.java b/metricsaggregator/src/test/java/io/dockstore/metricsaggregator/client/cli/MetricsAggregatorClientIT.java index acf1e0bb..6c6b99c8 100644 --- a/metricsaggregator/src/test/java/io/dockstore/metricsaggregator/client/cli/MetricsAggregatorClientIT.java +++ b/metricsaggregator/src/test/java/io/dockstore/metricsaggregator/client/cli/MetricsAggregatorClientIT.java @@ -83,6 +83,7 @@ import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import software.amazon.awssdk.services.s3.S3Client; @@ -136,6 +137,7 @@ public static void afterClass() { SUPPORT.after(); } + @Disabled(value = "Disabled because localstack does not support Athena in the free version") @Test @SuppressWarnings("checkstyle:methodlength") void testAggregateMetrics() { @@ -567,6 +569,7 @@ void testAggregateMetricsErrors() throws Exception { /** * Test that the metrics aggregator takes the newest execution if there are executions with duplicate IDs. */ + @Disabled(value = "Disabled because localstack does not support Athena in the free version") @Test void testAggregateExecutionsWithDuplicateIds() { final ApiClient apiClient = CommonTestUtilities.getOpenAPIWebClient(true, ADMIN_USERNAME, testingPostgres); @@ -635,6 +638,7 @@ void testAggregateExecutionsWithDuplicateIds() { assertNull(metrics.getValidationStatus()); // Verify that the metric from validation execution wasn't used } + @Disabled(value = "Disabled because localstack does not support Athena in the free version") @Test void testAggregateWithSkippedExecutions() { final ApiClient apiClient = CommonTestUtilities.getOpenAPIWebClient(true, ADMIN_USERNAME, testingPostgres); diff --git a/metricsaggregator/src/test/java/io/dockstore/metricsaggregator/helper/AggregationHelperTest.java b/metricsaggregator/src/test/java/io/dockstore/metricsaggregator/helper/AggregationHelperTest.java deleted file mode 100644 index 67286304..00000000 --- a/metricsaggregator/src/test/java/io/dockstore/metricsaggregator/helper/AggregationHelperTest.java +++ /dev/null @@ -1,100 +0,0 @@ -package io.dockstore.metricsaggregator.helper; - -import static io.dockstore.metricsaggregator.common.TestUtilities.generateExecutionId; -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertFalse; -import static org.junit.jupiter.api.Assertions.assertNotNull; -import static org.junit.jupiter.api.Assertions.assertNull; -import static org.junit.jupiter.api.Assertions.assertTrue; - -import io.dockstore.common.metrics.ExecutionsRequestBody; -import io.dockstore.common.metrics.ValidationExecution; -import io.dockstore.common.metrics.ValidationExecution.ValidatorTool; -import io.dockstore.openapi.client.model.ValidationStatusMetric; -import io.dockstore.openapi.client.model.ValidatorInfo; -import io.dockstore.openapi.client.model.ValidatorVersionInfo; -import java.time.Instant; -import java.util.ArrayList; -import java.util.List; -import java.util.Optional; -import org.junit.jupiter.api.Test; - -class AggregationHelperTest { - - @Test - void testGetAggregatedValidationStatus() { - List executions = new ArrayList<>(); - Optional validationStatusMetric = new ValidationStatusAggregator().getAggregatedMetricFromAllSubmissions(createExecutionsRequestBody(executions)); - assertTrue(validationStatusMetric.isEmpty()); - - // Add an execution with validation data - final ValidatorTool validatorTool = ValidatorTool.MINIWDL; - final String validatorToolVersion1 = "1.0"; - executions.add(createValidationExecution(validatorTool, validatorToolVersion1, true)); - validationStatusMetric = new ValidationStatusAggregator().getAggregatedMetricFromAllSubmissions(createExecutionsRequestBody(executions)); - assertTrue(validationStatusMetric.isPresent()); - ValidatorInfo validatorInfo = validationStatusMetric.get().getValidatorTools().get(validatorTool.toString()); - assertNotNull(validatorInfo); - assertNotNull(validatorInfo.getMostRecentVersionName()); - ValidatorVersionInfo mostRecentValidatorVersion = validatorInfo.getValidatorVersions().stream().filter(validationVersion -> validatorToolVersion1.equals(validationVersion.getName())).findFirst().get(); - assertTrue(mostRecentValidatorVersion.isIsValid()); - assertEquals(validatorToolVersion1, mostRecentValidatorVersion.getName()); - assertNull(mostRecentValidatorVersion.getErrorMessage()); - assertEquals(1, mostRecentValidatorVersion.getNumberOfRuns()); - assertEquals(100, mostRecentValidatorVersion.getPassingRate()); - assertEquals(1, validatorInfo.getValidatorVersions().size(), "There should be 2 ValidatorVersionInfo objects because 1 version was ran"); - assertEquals(1, validatorInfo.getNumberOfRuns()); - assertEquals(100, validatorInfo.getPassingRate()); - - // Add an execution that isn't valid for the same validator - final String validatorToolVersion2 = "2.0"; - ValidationExecution failedValidationExecution = createValidationExecution(validatorTool, validatorToolVersion2, false); - failedValidationExecution.setErrorMessage("This is an error message"); - executions.add(failedValidationExecution); - validationStatusMetric = new ValidationStatusAggregator().getAggregatedMetricFromAllSubmissions(createExecutionsRequestBody(executions)); - assertTrue(validationStatusMetric.isPresent()); - validatorInfo = validationStatusMetric.get().getValidatorTools().get(validatorTool.toString()); - mostRecentValidatorVersion = validatorInfo.getValidatorVersions().stream().filter(validatorVersion -> validatorToolVersion2.equals(validatorVersion.getName())).findFirst().get(); - assertFalse(mostRecentValidatorVersion.isIsValid()); - assertEquals(validatorToolVersion2, mostRecentValidatorVersion.getName()); - assertEquals("This is an error message", mostRecentValidatorVersion.getErrorMessage()); - assertEquals(1, mostRecentValidatorVersion.getNumberOfRuns()); - assertEquals(0, mostRecentValidatorVersion.getPassingRate()); - assertEquals(2, validatorInfo.getValidatorVersions().size(), "There should be 2 ValidatorVersionInfo objects because 2 versions were ran"); - assertEquals(2, validatorInfo.getNumberOfRuns()); - assertEquals(50, validatorInfo.getPassingRate()); - - // Add an execution that is valid for the same validator - String expectedDateExecuted = Instant.now().toString(); - ValidationExecution validationExecution = createValidationExecution(validatorTool, validatorToolVersion1, true); - validationExecution.setDateExecuted(expectedDateExecuted); - executions.add(validationExecution); - validationStatusMetric = new ValidationStatusAggregator().getAggregatedMetricFromAllSubmissions(createExecutionsRequestBody(executions)); - assertTrue(validationStatusMetric.isPresent()); - validatorInfo = validationStatusMetric.get().getValidatorTools().get(validatorTool.toString()); - mostRecentValidatorVersion = validatorInfo.getValidatorVersions().stream().filter(validationVersion -> validatorToolVersion1.equals(validationVersion.getName())).findFirst().get(); - assertTrue(mostRecentValidatorVersion.isIsValid(), "Should be true because the latest validation is valid"); - assertEquals(validatorToolVersion1, mostRecentValidatorVersion.getName()); - assertNull(mostRecentValidatorVersion.getErrorMessage()); - assertEquals(2, mostRecentValidatorVersion.getNumberOfRuns()); - assertEquals(100, mostRecentValidatorVersion.getPassingRate()); - assertEquals(expectedDateExecuted, mostRecentValidatorVersion.getDateExecuted()); // Check that this is the most recent ValidatorVersionInfo for this version because it was executed twice - assertEquals(2, validatorInfo.getValidatorVersions().size(), "There should be 2 ValidatorVersionInfo objects because 2 versions ran"); - assertEquals(3, validatorInfo.getNumberOfRuns()); - assertEquals(66.66666666666666, validatorInfo.getPassingRate()); - } - - private ValidationExecution createValidationExecution(ValidatorTool validatorTool, String validatorToolVersion, boolean isValid) { - ValidationExecution validationExecution = new ValidationExecution(validatorTool, isValid); - validationExecution.setValidatorToolVersion(validatorToolVersion); - validationExecution.setExecutionId(generateExecutionId()); - validationExecution.setDateExecuted(Instant.now().toString()); - return validationExecution; - } - - private ExecutionsRequestBody createExecutionsRequestBody(List validationExecutions) { - ExecutionsRequestBody executionsRequestBody = new ExecutionsRequestBody(); - executionsRequestBody.setValidationExecutions(validationExecutions); - return executionsRequestBody; - } -} diff --git a/metricsaggregator/src/test/java/io/dockstore/metricsaggregator/helper/CostAggregatorTest.java b/metricsaggregator/src/test/java/io/dockstore/metricsaggregator/helper/CostAggregatorTest.java deleted file mode 100644 index 6333e9e2..00000000 --- a/metricsaggregator/src/test/java/io/dockstore/metricsaggregator/helper/CostAggregatorTest.java +++ /dev/null @@ -1,81 +0,0 @@ -package io.dockstore.metricsaggregator.helper; - -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertTrue; - -import io.dockstore.common.metrics.Cost; -import io.dockstore.common.metrics.RunExecution; -import io.dockstore.common.metrics.TaskExecutions; -import io.dockstore.openapi.client.model.CostMetric; -import java.util.List; -import java.util.Optional; -import org.junit.jupiter.api.Test; - -class CostAggregatorTest { - private static final CostAggregator COST_AGGREGATOR = new CostAggregator(); - - /** - * Tests that the aggregator calculates the correct workflow RunExecution from a list of task RunExecutions. - */ - @Test - void testGetWorkflowExecutionFromTaskExecutions() { - // Empty TaskExecutions should return Optional.empty() - Optional workflowExecution = COST_AGGREGATOR.getWorkflowExecutionFromTaskExecutions(new TaskExecutions()); - assertTrue(workflowExecution.isEmpty()); - - // The workflow execution generated from a single task should have the same cost as the one task - TaskExecutions taskExecutions = new TaskExecutions(); - taskExecutions.setTaskExecutions(List.of(createRunExecution(1.0))); - workflowExecution = COST_AGGREGATOR.getWorkflowExecutionFromTaskExecutions(taskExecutions); - assertTrue(workflowExecution.isPresent()); - assertEquals(1.0, workflowExecution.get().getCost().getValue()); - - // The workflow execution generated from multiple tasks should have the sum of costs from the list of tasks - taskExecutions.setTaskExecutions(List.of( - createRunExecution(1.0), - createRunExecution(2.0), - createRunExecution(3.0) - )); - workflowExecution = COST_AGGREGATOR.getWorkflowExecutionFromTaskExecutions(taskExecutions); - assertTrue(workflowExecution.isPresent()); - assertEquals(6.0, workflowExecution.get().getCost().getValue()); - } - - /** - * Tests that the aggregator calculates the correct aggregated metric from a list of workflow RunExecutions - */ - @Test - void testGetAggregatedMetricFromWorkflowExecutions() { - // Empty list should return Optional.empty() - Optional costMetric = COST_AGGREGATOR.getAggregatedMetricFromExecutions(List.of()); - assertTrue(costMetric.isEmpty()); - - // Test the metric calculated from a single workflow execution. The min, max, and average should be the same value as the single execution - List workflowExecutions = List.of(createRunExecution(1.0)); - costMetric = COST_AGGREGATOR.getAggregatedMetricFromExecutions(workflowExecutions); - assertTrue(costMetric.isPresent()); - assertEquals(1.0, costMetric.get().getMinimum()); - assertEquals(1.0, costMetric.get().getMaximum()); - assertEquals(1.0, costMetric.get().getAverage()); - assertEquals(1, costMetric.get().getNumberOfDataPointsForAverage()); - - // Test the metric calculated from multiple workflow executions. - workflowExecutions = List.of( - createRunExecution(2.0), - createRunExecution(4.0), - createRunExecution(6.0) - ); - costMetric = COST_AGGREGATOR.getAggregatedMetricFromExecutions(workflowExecutions); - assertTrue(costMetric.isPresent()); - assertEquals(2.0, costMetric.get().getMinimum()); - assertEquals(6.0, costMetric.get().getMaximum()); - assertEquals(4.0, costMetric.get().getAverage()); - assertEquals(3, costMetric.get().getNumberOfDataPointsForAverage()); - } - - private RunExecution createRunExecution(Double costValue) { - RunExecution runExecution = new RunExecution(); - runExecution.setCost(new Cost(costValue)); - return runExecution; - } -} diff --git a/metricsaggregator/src/test/java/io/dockstore/metricsaggregator/helper/CpuAggregatorTest.java b/metricsaggregator/src/test/java/io/dockstore/metricsaggregator/helper/CpuAggregatorTest.java deleted file mode 100644 index ccd1b507..00000000 --- a/metricsaggregator/src/test/java/io/dockstore/metricsaggregator/helper/CpuAggregatorTest.java +++ /dev/null @@ -1,80 +0,0 @@ -package io.dockstore.metricsaggregator.helper; - -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertTrue; - -import io.dockstore.common.metrics.RunExecution; -import io.dockstore.common.metrics.TaskExecutions; -import io.dockstore.openapi.client.model.CpuMetric; -import java.util.List; -import java.util.Optional; -import org.junit.jupiter.api.Test; - -class CpuAggregatorTest { - private static final CpuAggregator CPU_AGGREGATOR = new CpuAggregator(); - - /** - * Tests that the aggregator calculates the correct workflow RunExecution from a list of task RunExecutions. - */ - @Test - void testGetWorkflowExecutionFromTaskExecutions() { - // Empty TaskExecutions should return Optional.empty() - Optional workflowExecution = CPU_AGGREGATOR.getWorkflowExecutionFromTaskExecutions(new TaskExecutions()); - assertTrue(workflowExecution.isEmpty()); - - // The workflow execution generated from a single task should have the same cpuRequirements as the one task - TaskExecutions taskExecutions = new TaskExecutions(); - taskExecutions.setTaskExecutions(List.of(createRunExecution(1))); - workflowExecution = CPU_AGGREGATOR.getWorkflowExecutionFromTaskExecutions(taskExecutions); - assertTrue(workflowExecution.isPresent()); - assertEquals(1, workflowExecution.get().getCpuRequirements()); - - // The workflow execution generated from multiple tasks should have the cpuRequirements from the highest cpuRequirements from the list of tasks - taskExecutions.setTaskExecutions(List.of( - createRunExecution(1), - createRunExecution(2), - createRunExecution(3) - )); - workflowExecution = CPU_AGGREGATOR.getWorkflowExecutionFromTaskExecutions(taskExecutions); - assertTrue(workflowExecution.isPresent()); - assertEquals(3, workflowExecution.get().getCpuRequirements()); - } - - /** - * Tests that the aggregator calculates the correct aggregated metric from a list of workflow RunExecutions - */ - @Test - void testGetAggregatedMetricFromWorkflowExecutions() { - // Empty list should return Optional.empty() - Optional cpuMetric = CPU_AGGREGATOR.getAggregatedMetricFromExecutions(List.of()); - assertTrue(cpuMetric.isEmpty()); - - // Test the metric calculated from a single workflow execution. The min, max, and average should be the same value as the single execution - List workflowExecutions = List.of(createRunExecution(1)); - cpuMetric = CPU_AGGREGATOR.getAggregatedMetricFromExecutions(workflowExecutions); - assertTrue(cpuMetric.isPresent()); - assertEquals(1.0, cpuMetric.get().getMinimum()); - assertEquals(1.0, cpuMetric.get().getMaximum()); - assertEquals(1.0, cpuMetric.get().getAverage()); - assertEquals(1, cpuMetric.get().getNumberOfDataPointsForAverage()); - - // Test the metric calculated from multiple workflow executions. - workflowExecutions = List.of( - createRunExecution(2), - createRunExecution(4), - createRunExecution(6) - ); - cpuMetric = CPU_AGGREGATOR.getAggregatedMetricFromExecutions(workflowExecutions); - assertTrue(cpuMetric.isPresent()); - assertEquals(2.0, cpuMetric.get().getMinimum()); - assertEquals(6.0, cpuMetric.get().getMaximum()); - assertEquals(4.0, cpuMetric.get().getAverage()); - assertEquals(3, cpuMetric.get().getNumberOfDataPointsForAverage()); - } - - private RunExecution createRunExecution(Integer cpuRequirements) { - RunExecution runExecution = new RunExecution(); - runExecution.setCpuRequirements(cpuRequirements); - return runExecution; - } -} diff --git a/metricsaggregator/src/test/java/io/dockstore/metricsaggregator/helper/ExecutionStatusAggregatorTest.java b/metricsaggregator/src/test/java/io/dockstore/metricsaggregator/helper/ExecutionStatusAggregatorTest.java deleted file mode 100644 index 21b0aea6..00000000 --- a/metricsaggregator/src/test/java/io/dockstore/metricsaggregator/helper/ExecutionStatusAggregatorTest.java +++ /dev/null @@ -1,362 +0,0 @@ -package io.dockstore.metricsaggregator.helper; - -import static io.dockstore.common.metrics.ExecutionStatus.FAILED_RUNTIME_INVALID; -import static io.dockstore.common.metrics.ExecutionStatus.FAILED_SEMANTIC_INVALID; -import static io.dockstore.common.metrics.ExecutionStatus.SUCCESSFUL; -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertFalse; -import static org.junit.jupiter.api.Assertions.assertNotNull; -import static org.junit.jupiter.api.Assertions.assertNull; -import static org.junit.jupiter.api.Assertions.assertTrue; - -import io.dockstore.common.metrics.Cost; -import io.dockstore.common.metrics.ExecutionStatus; -import io.dockstore.common.metrics.ExecutionsRequestBody; -import io.dockstore.common.metrics.RunExecution; -import io.dockstore.common.metrics.TaskExecutions; -import io.dockstore.openapi.client.model.CostMetric; -import io.dockstore.openapi.client.model.CpuMetric; -import io.dockstore.openapi.client.model.ExecutionStatusMetric; -import io.dockstore.openapi.client.model.ExecutionTimeMetric; -import io.dockstore.openapi.client.model.MemoryMetric; -import java.util.ArrayList; -import java.util.List; -import java.util.Optional; -import org.junit.jupiter.api.Test; - -class ExecutionStatusAggregatorTest { - private static final ExecutionStatusAggregator EXECUTION_STATUS_AGGREGATOR = new ExecutionStatusAggregator(); - - /** - * Tests that the aggregator calculates the correct workflow RunExecution from a list of task RunExecutions. - */ - @Test - void testGetWorkflowExecutionFromTaskExecutions() { - // Empty TaskExecutions should return Optional.empty() - Optional workflowExecution = EXECUTION_STATUS_AGGREGATOR.getWorkflowExecutionFromTaskExecutions(new TaskExecutions()); - assertTrue(workflowExecution.isEmpty()); - - // The workflow execution generated from a single task should have the same status as the one task - TaskExecutions taskExecutions = createTaskExecutions(new RunExecution(SUCCESSFUL)); - workflowExecution = EXECUTION_STATUS_AGGREGATOR.getWorkflowExecutionFromTaskExecutions(taskExecutions); - assertTrue(workflowExecution.isPresent()); - assertEquals(SUCCESSFUL, workflowExecution.get().getExecutionStatus()); - - // The workflow execution generated from tasks that were all successful should have a successful status - taskExecutions = createTaskExecutions( - new RunExecution(SUCCESSFUL), - new RunExecution(SUCCESSFUL), - new RunExecution(SUCCESSFUL) - ); - workflowExecution = EXECUTION_STATUS_AGGREGATOR.getWorkflowExecutionFromTaskExecutions(taskExecutions); - assertTrue(workflowExecution.isPresent()); - assertEquals(SUCCESSFUL, workflowExecution.get().getExecutionStatus()); - - // The workflow execution generated from tasks that where there are failures should have the most frequent failed status - taskExecutions = createTaskExecutions( - new RunExecution(SUCCESSFUL), - new RunExecution(FAILED_SEMANTIC_INVALID), - new RunExecution(FAILED_RUNTIME_INVALID), - new RunExecution(FAILED_RUNTIME_INVALID) - ); - workflowExecution = EXECUTION_STATUS_AGGREGATOR.getWorkflowExecutionFromTaskExecutions(taskExecutions); - assertTrue(workflowExecution.isPresent()); - // Should be FAILED_RUNTIME_INVALID because it's the most frequent failed status - assertEquals(FAILED_RUNTIME_INVALID, workflowExecution.get().getExecutionStatus()); - // When there are equal counts of all failed statuses, the workflow execution status should be one of them - taskExecutions = createTaskExecutions( - new RunExecution(SUCCESSFUL), - new RunExecution(FAILED_SEMANTIC_INVALID), - new RunExecution(FAILED_RUNTIME_INVALID) - ); - workflowExecution = EXECUTION_STATUS_AGGREGATOR.getWorkflowExecutionFromTaskExecutions(taskExecutions); - assertTrue(workflowExecution.isPresent()); - assertTrue(workflowExecution.get().getExecutionStatus() == FAILED_RUNTIME_INVALID - || workflowExecution.get().getExecutionStatus() == FAILED_SEMANTIC_INVALID); - } - - /** - * Tests that the aggregator calculates the correct aggregated metric from a list of workflow RunExecutions - */ - @Test - void testGetAggregatedMetricFromWorkflowExecutions() { - // Empty list should return Optional.empty() - Optional executionStatusMetric = EXECUTION_STATUS_AGGREGATOR.getAggregatedMetricFromExecutions(List.of()); - assertTrue(executionStatusMetric.isEmpty()); - - // Test the metric calculated from a single workflow execution. - List workflowExecutions = List.of(new RunExecution(SUCCESSFUL)); - executionStatusMetric = EXECUTION_STATUS_AGGREGATOR.getAggregatedMetricFromExecutions(workflowExecutions); - assertTrue(executionStatusMetric.isPresent()); - assertEquals(1, executionStatusMetric.get().getCount().get(SUCCESSFUL.toString()).getExecutionStatusCount()); - assertFalse(executionStatusMetric.get().getCount().containsKey(FAILED_SEMANTIC_INVALID.toString())); - assertFalse(executionStatusMetric.get().getCount().containsKey(FAILED_RUNTIME_INVALID.toString())); - - // Test the metric calculated from multiple workflow executions. - workflowExecutions = List.of( - new RunExecution(SUCCESSFUL), - new RunExecution(FAILED_SEMANTIC_INVALID), - new RunExecution(FAILED_RUNTIME_INVALID) - ); - executionStatusMetric = EXECUTION_STATUS_AGGREGATOR.getAggregatedMetricFromExecutions(workflowExecutions); - assertTrue(executionStatusMetric.isPresent()); - assertEquals(1, executionStatusMetric.get().getCount().get(SUCCESSFUL.toString()).getExecutionStatusCount()); - assertEquals(1, executionStatusMetric.get().getCount().get(FAILED_SEMANTIC_INVALID.toString()).getExecutionStatusCount()); - assertEquals(1, executionStatusMetric.get().getCount().get(FAILED_RUNTIME_INVALID.toString()).getExecutionStatusCount()); - } - - @Test - void testGetAggregatedExecutionStatus() { - ExecutionStatusAggregator executionStatusAggregator = new ExecutionStatusAggregator(); - ExecutionsRequestBody allSubmissions = new ExecutionsRequestBody(); - Optional executionStatusMetric = executionStatusAggregator.getAggregatedMetricFromAllSubmissions(allSubmissions); - assertTrue(executionStatusMetric.isEmpty()); - - RunExecution submittedRunExecution = new RunExecution(SUCCESSFUL); - allSubmissions = createExecutionsRequestBody(List.of(submittedRunExecution)); - executionStatusMetric = executionStatusAggregator.getAggregatedMetricFromAllSubmissions(allSubmissions); - assertTrue(executionStatusMetric.isPresent()); - assertEquals(1, executionStatusMetric.get().getCount().get(SUCCESSFUL.toString()).getExecutionStatusCount()); - - // Aggregate submissions containing workflow run executions and task executions - submittedRunExecution = new RunExecution(SUCCESSFUL); - TaskExecutions taskExecutionsForOneWorkflowRun = createTaskExecutions(submittedRunExecution); - allSubmissions = createExecutionsRequestBody(List.of(submittedRunExecution), List.of(taskExecutionsForOneWorkflowRun)); - executionStatusMetric = executionStatusAggregator.getAggregatedMetricFromAllSubmissions(allSubmissions); - assertTrue(executionStatusMetric.isPresent()); - assertEquals(2, executionStatusMetric.get().getCount().get(SUCCESSFUL.toString()).getExecutionStatusCount()); - // Submit one successful workflow execution and a list of task executions where the single task failed - taskExecutionsForOneWorkflowRun = createTaskExecutions(new RunExecution(FAILED_RUNTIME_INVALID)); - allSubmissions = createExecutionsRequestBody(List.of(submittedRunExecution), List.of(taskExecutionsForOneWorkflowRun)); - executionStatusMetric = executionStatusAggregator.getAggregatedMetricFromAllSubmissions(allSubmissions); - assertTrue(executionStatusMetric.isPresent()); - assertEquals(1, executionStatusMetric.get().getCount().get(SUCCESSFUL.toString()).getExecutionStatusCount()); - assertEquals(1, executionStatusMetric.get().getCount().get(FAILED_RUNTIME_INVALID.toString()).getExecutionStatusCount()); - // Submit one successful workflow execution and a list of task executions where there are two tasks that failed due to different reasons - taskExecutionsForOneWorkflowRun = createTaskExecutions(new RunExecution(FAILED_RUNTIME_INVALID), new RunExecution(FAILED_SEMANTIC_INVALID)); - allSubmissions = createExecutionsRequestBody(List.of(submittedRunExecution), List.of(taskExecutionsForOneWorkflowRun)); - executionStatusMetric = executionStatusAggregator.getAggregatedMetricFromAllSubmissions(allSubmissions); - assertTrue(executionStatusMetric.isPresent()); - assertEquals(1, executionStatusMetric.get().getCount().get(SUCCESSFUL.toString()).getExecutionStatusCount()); - // There should be 1 of either FAILED_RUNTIME_INVALID or FAILED_SEMANTIC_INVALID. - assertTrue(executionStatusMetric.get().getCount().containsKey(FAILED_RUNTIME_INVALID.toString()) || executionStatusMetric.get().getCount().containsKey(FAILED_SEMANTIC_INVALID.toString())); - // Submit one successful workflow execution and a list of task executions where there are 3 tasks that failed due to different reasons - taskExecutionsForOneWorkflowRun.setTaskExecutions( - List.of(new RunExecution(FAILED_RUNTIME_INVALID), - new RunExecution(FAILED_RUNTIME_INVALID), - new RunExecution(FAILED_SEMANTIC_INVALID))); - allSubmissions = createExecutionsRequestBody(List.of(submittedRunExecution), List.of(taskExecutionsForOneWorkflowRun)); - executionStatusMetric = executionStatusAggregator.getAggregatedMetricFromAllSubmissions(allSubmissions); - assertTrue(executionStatusMetric.isPresent()); - assertEquals(1, executionStatusMetric.get().getCount().get(SUCCESSFUL.toString()).getExecutionStatusCount()); - // There should be one count of FAILED_RUNTIME_INVALID because it's the most frequent failed status in the list of task executions - assertEquals(1, executionStatusMetric.get().getCount().get(FAILED_RUNTIME_INVALID.toString()).getExecutionStatusCount()); - } - - @Test - void testGetAggregatedExecutionTime() { - List badExecutions = new ArrayList<>(); - - // Add an execution that doesn't have execution time data - badExecutions.add(new RunExecution(SUCCESSFUL)); - Optional executionStatusMetric = EXECUTION_STATUS_AGGREGATOR.getAggregatedMetricFromAllSubmissions(createExecutionsRequestBody(badExecutions)); - assertTrue(executionStatusMetric.isPresent()); - ExecutionTimeMetric executionTimeMetric = executionStatusMetric.get().getCount().get(SUCCESSFUL.name()).getExecutionTime(); - assertNull(executionTimeMetric); - - // Add an execution with malformed execution time data - badExecutions.add(createRunExecutionWithExecutionTime(SUCCESSFUL, "1 second")); - executionStatusMetric = EXECUTION_STATUS_AGGREGATOR.getAggregatedMetricFromAllSubmissions(createExecutionsRequestBody(badExecutions)); - executionTimeMetric = executionStatusMetric.get().getCount().get(SUCCESSFUL.name()).getExecutionTime(); - assertNull(executionTimeMetric); - - // Add an execution with execution time - final int timeInSeconds = 10; - List executions = List.of(createRunExecutionWithExecutionTime(SUCCESSFUL, String.format("PT%dS", timeInSeconds))); - executionStatusMetric = EXECUTION_STATUS_AGGREGATOR.getAggregatedMetricFromAllSubmissions(createExecutionsRequestBody(executions)); - executionTimeMetric = executionStatusMetric.get().getCount().get(SUCCESSFUL.name()).getExecutionTime(); - assertNotNull(executionTimeMetric); - assertEquals(timeInSeconds, executionTimeMetric.getMinimum()); - assertEquals(timeInSeconds, executionTimeMetric.getMaximum()); - assertEquals(timeInSeconds, executionTimeMetric.getAverage()); - assertEquals(1, executionTimeMetric.getNumberOfDataPointsForAverage()); - - // Aggregate submissions containing workflow run executions and task executions - // Submit a single workflow execution that took 10s and a single task that took 10s - executions = List.of(createRunExecutionWithExecutionTime(SUCCESSFUL, String.format("PT%dS", timeInSeconds))); - TaskExecutions taskExecutions = new TaskExecutions(); - taskExecutions.setTaskExecutions(executions); - executionStatusMetric = EXECUTION_STATUS_AGGREGATOR.getAggregatedMetricFromAllSubmissions(createExecutionsRequestBody(executions, List.of(taskExecutions))); - executionTimeMetric = executionStatusMetric.get().getCount().get(SUCCESSFUL.name()).getExecutionTime(); - assertNotNull(executionTimeMetric); - assertEquals(timeInSeconds, executionTimeMetric.getMinimum()); - assertEquals(timeInSeconds, executionTimeMetric.getMaximum()); - assertEquals(timeInSeconds, executionTimeMetric.getAverage()); - assertEquals(2, executionTimeMetric.getNumberOfDataPointsForAverage()); // There should be 2 data points: 1 for the workflow execution and 1 for the list of tasks - // Submit a single workflow execution that took 10s and two tasks that took 10 seconds. This time, dateExecuted is provided - RunExecution task1 = createRunExecutionWithExecutionTime(SUCCESSFUL, String.format("PT%dS", timeInSeconds)); - RunExecution task2 = createRunExecutionWithExecutionTime(SUCCESSFUL, String.format("PT%dS", timeInSeconds)); - // The time difference between these two tasks is 10 seconds. When there is more than one task, the duration will be calculated from the dates executed, plus the duration of the last task, which is 10s - task1.setDateExecuted("2023-11-09T21:54:10.571285905Z"); - task2.setDateExecuted("2023-11-09T21:54:20.571285905Z"); - executionStatusMetric = EXECUTION_STATUS_AGGREGATOR.getAggregatedMetricFromAllSubmissions(createExecutionsRequestBody(executions, List.of(createTaskExecutions(task1, task2)))); - executionTimeMetric = executionStatusMetric.get().getCount().get(SUCCESSFUL.name()).getExecutionTime(); - assertNotNull(executionTimeMetric); - assertEquals(10, executionTimeMetric.getMinimum(), "The minimum is from the workflow execution"); - assertEquals(20, executionTimeMetric.getMaximum(), "The maximum is from the workflow execution calculated from the two tasks"); - assertEquals(15, executionTimeMetric.getAverage()); - assertEquals(2, executionTimeMetric.getNumberOfDataPointsForAverage()); // There should be 2 data points: 1 for the workflow execution and 1 for the list of tasks - } - - @Test - void testGetAggregatedCpu() { - List executions = new ArrayList<>(); - - // Add an execution that doesn't have cpu data - executions.add(new RunExecution(SUCCESSFUL)); - Optional executionStatusMetric = EXECUTION_STATUS_AGGREGATOR.getAggregatedMetricFromAllSubmissions(createExecutionsRequestBody(executions)); - CpuMetric cpuMetric = executionStatusMetric.get().getCount().get(SUCCESSFUL.name()).getCpu(); - assertNull(cpuMetric); - - // Add an execution with cpu data - final int cpu = 1; - executions.add(createRunExecutionWithCpu(SUCCESSFUL, cpu)); - executionStatusMetric = EXECUTION_STATUS_AGGREGATOR.getAggregatedMetricFromAllSubmissions(createExecutionsRequestBody(executions)); - cpuMetric = executionStatusMetric.get().getCount().get(SUCCESSFUL.name()).getCpu(); - assertNotNull(cpuMetric); - assertEquals(cpu, cpuMetric.getMinimum()); - assertEquals(cpu, cpuMetric.getMaximum()); - assertEquals(cpu, cpuMetric.getAverage()); - assertEquals(1, cpuMetric.getNumberOfDataPointsForAverage()); - - // Aggregate submissions containing workflow run executions and task executions - executions = List.of(createRunExecutionWithCpu(SUCCESSFUL, cpu)); - // Two task executions with different CPU requirements. The workflow execution calculated from these tasks should take the highest cpuRequirement from the tasks - TaskExecutions taskExecutions = createTaskExecutions( - createRunExecutionWithCpu(SUCCESSFUL, 1), - createRunExecutionWithCpu(SUCCESSFUL, 4)); - executionStatusMetric = EXECUTION_STATUS_AGGREGATOR.getAggregatedMetricFromAllSubmissions(createExecutionsRequestBody(executions, List.of(taskExecutions))); - cpuMetric = executionStatusMetric.get().getCount().get(SUCCESSFUL.name()).getCpu(); - assertNotNull(cpuMetric); - assertEquals(1.0, cpuMetric.getMinimum()); - assertEquals(4.0, cpuMetric.getMaximum()); - assertEquals(2.5, cpuMetric.getAverage()); - assertEquals(2, cpuMetric.getNumberOfDataPointsForAverage()); // Two data points: 1 from the workflow execution and 1 for the list of tasks - } - - @Test - void testGetAggregatedMemory() { - List executions = new ArrayList<>(); - - // Add an execution that doesn't have memory data - executions.add(new RunExecution(SUCCESSFUL)); - Optional executionStatusMetric = EXECUTION_STATUS_AGGREGATOR.getAggregatedMetricFromAllSubmissions(createExecutionsRequestBody(executions)); - MemoryMetric memoryMetric = executionStatusMetric.get().getCount().get(SUCCESSFUL.name()).getMemory(); - assertNull(memoryMetric); - - // Add an execution with memory data - Double memoryInGB = 2.0; - executions.add(createRunExecutionWithMemory(SUCCESSFUL, memoryInGB)); - executionStatusMetric = EXECUTION_STATUS_AGGREGATOR.getAggregatedMetricFromAllSubmissions(createExecutionsRequestBody(executions)); - memoryMetric = executionStatusMetric.get().getCount().get(SUCCESSFUL.name()).getMemory(); - assertNotNull(memoryMetric); - assertEquals(memoryInGB, memoryMetric.getMinimum()); - assertEquals(memoryInGB, memoryMetric.getMaximum()); - assertEquals(memoryInGB, memoryMetric.getAverage()); - assertEquals(1, memoryMetric.getNumberOfDataPointsForAverage()); - - // Aggregate submissions containing workflow run executions and task executions - executions = List.of(createRunExecutionWithMemory(SUCCESSFUL, 2.0)); - // Two task executions with different memory requirements. The workflow execution calculated from these tasks should take the highest memoryRequirementsGB from the tasks - TaskExecutions taskExecutions = createTaskExecutions( - createRunExecutionWithMemory(SUCCESSFUL, 2.0), - createRunExecutionWithMemory(SUCCESSFUL, 4.0)); - executionStatusMetric = EXECUTION_STATUS_AGGREGATOR.getAggregatedMetricFromAllSubmissions(createExecutionsRequestBody(executions, List.of(taskExecutions))); - memoryMetric = executionStatusMetric.get().getCount().get(SUCCESSFUL.name()).getMemory(); - assertNotNull(memoryMetric); - assertEquals(2.0, memoryMetric.getMinimum()); - assertEquals(4.0, memoryMetric.getMaximum()); - assertEquals(3.0, memoryMetric.getAverage()); - assertEquals(2, memoryMetric.getNumberOfDataPointsForAverage()); // Two data points: 1 from the workflow execution and 1 for the list of tasks - } - - @Test - void testGetAggregatedCost() { - List executions = new ArrayList<>(); - - // Add an execution that doesn't have cost data - executions.add(new RunExecution(SUCCESSFUL)); - Optional executionStatusMetric = EXECUTION_STATUS_AGGREGATOR.getAggregatedMetricFromAllSubmissions(createExecutionsRequestBody(executions)); - assertTrue(executionStatusMetric.isPresent()); - CostMetric costMetric = executionStatusMetric.get().getCount().get(SUCCESSFUL.name()).getCost(); - assertNull(costMetric); - - // Add an execution with cost data - Double costInUSD = 2.00; - executions.add(createRunExecutionWithCost(SUCCESSFUL, costInUSD)); - executionStatusMetric = EXECUTION_STATUS_AGGREGATOR.getAggregatedMetricFromAllSubmissions(createExecutionsRequestBody(executions)); - costMetric = executionStatusMetric.get().getCount().get(SUCCESSFUL.name()).getCost(); - assertNotNull(costMetric); - assertEquals(costInUSD, costMetric.getMinimum()); - assertEquals(costInUSD, costMetric.getMaximum()); - assertEquals(costInUSD, costMetric.getAverage()); - assertEquals(1, costMetric.getNumberOfDataPointsForAverage()); - - // Aggregate submissions containing workflow run executions and task executions - executions = List.of(createRunExecutionWithCost(SUCCESSFUL, 2.00)); - // Two task executions with different memory requirements. The workflow execution calculated from these tasks should have the sum of the cost of all tasks - TaskExecutions taskExecutions = createTaskExecutions( - createRunExecutionWithCost(SUCCESSFUL, 2.00), - createRunExecutionWithCost(SUCCESSFUL, 4.00) - ); - executionStatusMetric = EXECUTION_STATUS_AGGREGATOR.getAggregatedMetricFromAllSubmissions(createExecutionsRequestBody(executions, List.of(taskExecutions))); - costMetric = executionStatusMetric.get().getCount().get(SUCCESSFUL.name()).getCost(); - assertNotNull(costMetric); - assertEquals(2.0, costMetric.getMinimum()); - assertEquals(6.0, costMetric.getMaximum()); // The max is the cost of the two tasks summed together - assertEquals(4.0, costMetric.getAverage()); - assertEquals(2, costMetric.getNumberOfDataPointsForAverage()); // Two data points: 1 from the workflow execution and 1 for the list of tasks - } - - private RunExecution createRunExecutionWithExecutionTime(ExecutionStatus executionStatus, String executionTime) { - RunExecution runExecution = new RunExecution(executionStatus); - runExecution.setExecutionTime(executionTime); - return runExecution; - } - - private RunExecution createRunExecutionWithMemory(ExecutionStatus executionStatus, Double memoryRequirementsGB) { - RunExecution runExecution = new RunExecution(executionStatus); - runExecution.setMemoryRequirementsGB(memoryRequirementsGB); - return runExecution; - } - - private RunExecution createRunExecutionWithCpu(ExecutionStatus executionStatus, Integer cpuRequirements) { - RunExecution runExecution = new RunExecution(executionStatus); - runExecution.setCpuRequirements(cpuRequirements); - return runExecution; - } - - private RunExecution createRunExecutionWithCost(ExecutionStatus executionStatus, Double costValue) { - RunExecution runExecution = new RunExecution(executionStatus); - runExecution.setCost(new Cost(costValue)); - return runExecution; - } - - private ExecutionsRequestBody createExecutionsRequestBody(List runExecutions) { - ExecutionsRequestBody executionsRequestBody = new ExecutionsRequestBody(); - executionsRequestBody.setRunExecutions(runExecutions); - return executionsRequestBody; - } - - private ExecutionsRequestBody createExecutionsRequestBody(List runExecutions, List taskExecutions) { - ExecutionsRequestBody executionsRequestBody = createExecutionsRequestBody(runExecutions); - executionsRequestBody.setTaskExecutions(taskExecutions); - return executionsRequestBody; - } - - private TaskExecutions createTaskExecutions(RunExecution... taskExecutions) { - TaskExecutions taskExecutionsSet = new TaskExecutions(); - taskExecutionsSet.setTaskExecutions(List.of(taskExecutions)); - return taskExecutionsSet; - } -} diff --git a/metricsaggregator/src/test/java/io/dockstore/metricsaggregator/helper/ExecutionTimeAggregatorTest.java b/metricsaggregator/src/test/java/io/dockstore/metricsaggregator/helper/ExecutionTimeAggregatorTest.java deleted file mode 100644 index 6f0667d1..00000000 --- a/metricsaggregator/src/test/java/io/dockstore/metricsaggregator/helper/ExecutionTimeAggregatorTest.java +++ /dev/null @@ -1,88 +0,0 @@ -package io.dockstore.metricsaggregator.helper; - -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertTrue; - -import io.dockstore.common.metrics.RunExecution; -import io.dockstore.common.metrics.TaskExecutions; -import io.dockstore.openapi.client.model.ExecutionTimeMetric; -import java.util.List; -import java.util.Optional; -import org.junit.jupiter.api.Test; - -class ExecutionTimeAggregatorTest { - private static final ExecutionTimeAggregator EXECUTION_TIME_AGGREGATOR = new ExecutionTimeAggregator(); - - /** - * Tests that the aggregator calculates the correct workflow RunExecution from a list of task RunExecutions. - */ - @Test - void testGetWorkflowExecutionFromTaskExecutions() { - // Empty TaskExecutions should return Optional.empty() - Optional workflowExecution = EXECUTION_TIME_AGGREGATOR.getWorkflowExecutionFromTaskExecutions(new TaskExecutions()); - assertTrue(workflowExecution.isEmpty()); - - // The workflow execution generated from a single task should have the executionTime as the one task - String tenSecondsExecutionTime = "PT10S"; - TaskExecutions taskExecutions = new TaskExecutions(); - taskExecutions.setTaskExecutions(List.of(createRunExecution(tenSecondsExecutionTime))); - workflowExecution = EXECUTION_TIME_AGGREGATOR.getWorkflowExecutionFromTaskExecutions(taskExecutions); - assertTrue(workflowExecution.isPresent()); - assertEquals(tenSecondsExecutionTime, workflowExecution.get().getExecutionTime()); - - // The workflow execution generated from multiple tasks should calculate the executionTime from the earliest and latest dateExecuted fields from the tasks - List iso8601Dates = List.of("2023-11-09T21:54:00.571285905Z", "2023-11-09T21:54:10.571285905Z", "2023-11-09T21:54:20.571285905Z"); // 10 second increments - // Create 3 tasks where the difference between each dateExecuted is 10 seconds - taskExecutions.setTaskExecutions(iso8601Dates.stream() - .map(dateExecuted -> { - // Setting the execution time, but this isn't used to calculate the workflow RunExecution execution time - RunExecution taskExecution = createRunExecution(tenSecondsExecutionTime); - taskExecution.setDateExecuted(dateExecuted); - return taskExecution; - }) - .toList() - ); - workflowExecution = EXECUTION_TIME_AGGREGATOR.getWorkflowExecutionFromTaskExecutions(taskExecutions); - assertTrue(workflowExecution.isPresent()); - // Should be 30 seconds because the difference between the earliest and latest date executed is 20 seconds and the duration of the last task is 10s - assertEquals("PT30S", workflowExecution.get().getExecutionTime()); - } - - /** - * Tests that the aggregator calculates the correct aggregated metric from a list of workflow RunExecutions - */ - @Test - void testGetAggregatedMetricFromWorkflowExecutions() { - // Empty list should return Optional.empty() - Optional executionTimeMetric = EXECUTION_TIME_AGGREGATOR.getAggregatedMetricFromExecutions(List.of()); - assertTrue(executionTimeMetric.isEmpty()); - - // Test the metric calculated from a single workflow execution. The min, max, and average should be the same value as the single execution - List workflowExecutions = List.of(createRunExecution("PT10S")); // 10 seconds - executionTimeMetric = EXECUTION_TIME_AGGREGATOR.getAggregatedMetricFromExecutions(workflowExecutions); - assertTrue(executionTimeMetric.isPresent()); - assertEquals(10.0, executionTimeMetric.get().getMinimum()); - assertEquals(10.0, executionTimeMetric.get().getMaximum()); - assertEquals(10.0, executionTimeMetric.get().getAverage()); - assertEquals(1, executionTimeMetric.get().getNumberOfDataPointsForAverage()); - - // Test the metric calculated from multiple workflow executions. - workflowExecutions = List.of( - createRunExecution("PT10S"), // 10 seconds - createRunExecution("PT20S"), // 20 seconds - createRunExecution("PT30S") // 30 seconds - ); - executionTimeMetric = EXECUTION_TIME_AGGREGATOR.getAggregatedMetricFromExecutions(workflowExecutions); - assertTrue(executionTimeMetric.isPresent()); - assertEquals(10.0, executionTimeMetric.get().getMinimum()); - assertEquals(30.0, executionTimeMetric.get().getMaximum()); - assertEquals(20.0, executionTimeMetric.get().getAverage()); - assertEquals(3, executionTimeMetric.get().getNumberOfDataPointsForAverage()); - } - - private RunExecution createRunExecution(String executionTime) { - RunExecution runExecution = new RunExecution(); - runExecution.setExecutionTime(executionTime); - return runExecution; - } -} diff --git a/metricsaggregator/src/test/java/io/dockstore/metricsaggregator/helper/MemoryAggregatorTest.java b/metricsaggregator/src/test/java/io/dockstore/metricsaggregator/helper/MemoryAggregatorTest.java deleted file mode 100644 index 7a6437dd..00000000 --- a/metricsaggregator/src/test/java/io/dockstore/metricsaggregator/helper/MemoryAggregatorTest.java +++ /dev/null @@ -1,80 +0,0 @@ -package io.dockstore.metricsaggregator.helper; - -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertTrue; - -import io.dockstore.common.metrics.RunExecution; -import io.dockstore.common.metrics.TaskExecutions; -import io.dockstore.openapi.client.model.MemoryMetric; -import java.util.List; -import java.util.Optional; -import org.junit.jupiter.api.Test; - -class MemoryAggregatorTest { - private static final MemoryAggregator MEMORY_AGGREGATOR = new MemoryAggregator(); - - /** - * Tests that the aggregator calculates the correct workflow RunExecution from a TaskExecutions that represents a list of task RunExecutions that were executed during a single workflow execution. - */ - @Test - void testGetWorkflowExecutionFromTaskExecutions() { - // Empty TaskExecutions should return Optional.empty() - Optional workflowExecution = MEMORY_AGGREGATOR.getWorkflowExecutionFromTaskExecutions(new TaskExecutions()); - assertTrue(workflowExecution.isEmpty()); - - // The workflow execution generated from a single task should have the same memoryRequirementsGB as the one task - TaskExecutions taskExecutions = new TaskExecutions(); - taskExecutions.setTaskExecutions(List.of(createRunExecution(1.0))); - workflowExecution = MEMORY_AGGREGATOR.getWorkflowExecutionFromTaskExecutions(taskExecutions); - assertTrue(workflowExecution.isPresent()); - assertEquals(1.0, workflowExecution.get().getMemoryRequirementsGB()); - - // The workflow execution generated from multiple tasks should have the memoryRequirementsGB from the highest memoryRequirementsGB from the list of tasks - taskExecutions.setTaskExecutions(List.of( - createRunExecution(1.0), - createRunExecution(2.0), - createRunExecution(3.0) - )); - workflowExecution = MEMORY_AGGREGATOR.getWorkflowExecutionFromTaskExecutions(taskExecutions); - assertTrue(workflowExecution.isPresent()); - assertEquals(3.0, workflowExecution.get().getMemoryRequirementsGB()); - } - - /** - * Tests that the aggregator calculates the correct aggregated metric from a list of workflow RunExecutions - */ - @Test - void testGetAggregatedMetricFromWorkflowExecutions() { - // Empty list should return Optional.empty() - Optional memoryMetric = MEMORY_AGGREGATOR.getAggregatedMetricFromExecutions(List.of()); - assertTrue(memoryMetric.isEmpty()); - - // Test the memory metric calculated from a single workflow execution. The min, max, and average should be the same value as the single execution - List workflowExecutions = List.of(createRunExecution(1.0)); - memoryMetric = MEMORY_AGGREGATOR.getAggregatedMetricFromExecutions(workflowExecutions); - assertTrue(memoryMetric.isPresent()); - assertEquals(1.0, memoryMetric.get().getMinimum()); - assertEquals(1.0, memoryMetric.get().getMaximum()); - assertEquals(1.0, memoryMetric.get().getAverage()); - assertEquals(1, memoryMetric.get().getNumberOfDataPointsForAverage()); - - // Test the memory metric calculated from multiple workflow executions. - workflowExecutions = List.of( - createRunExecution(2.0), - createRunExecution(4.0), - createRunExecution(6.0) - ); - memoryMetric = MEMORY_AGGREGATOR.getAggregatedMetricFromExecutions(workflowExecutions); - assertTrue(memoryMetric.isPresent()); - assertEquals(2.0, memoryMetric.get().getMinimum()); - assertEquals(6.0, memoryMetric.get().getMaximum()); - assertEquals(4.0, memoryMetric.get().getAverage()); - assertEquals(3, memoryMetric.get().getNumberOfDataPointsForAverage()); - } - - private RunExecution createRunExecution(Double memoryRequirementGB) { - RunExecution runExecution = new RunExecution(); - runExecution.setMemoryRequirementsGB(memoryRequirementGB); - return runExecution; - } -}