diff --git a/metricsaggregator/src/main/java/io/dockstore/metricsaggregator/MetricsAggregatorS3Client.java b/metricsaggregator/src/main/java/io/dockstore/metricsaggregator/MetricsAggregatorS3Client.java index 92c5b479..f660de51 100644 --- a/metricsaggregator/src/main/java/io/dockstore/metricsaggregator/MetricsAggregatorS3Client.java +++ b/metricsaggregator/src/main/java/io/dockstore/metricsaggregator/MetricsAggregatorS3Client.java @@ -48,6 +48,9 @@ public class MetricsAggregatorS3Client { private static final Logger LOG = LoggerFactory.getLogger(MetricsAggregatorS3Client.class); private static final Gson GSON = new Gson(); + private int numberOfDirectoriesProcessed = 0; + private int numberOfMetricsSubmitted = 0; + private int numberOfMetricsSkipped = 0; private final String bucketName; @@ -67,60 +70,72 @@ public MetricsAggregatorS3Client(String bucketName, String s3EndpointOverride) t } public void aggregateMetrics(ExtendedGa4GhApi extendedGa4GhApi) { + LOG.info("Getting directories to process..."); List metricsDirectories = getDirectories(); if (metricsDirectories.isEmpty()) { - System.out.println("No directories found to aggregate metrics"); + LOG.info("No directories found to aggregate metrics"); return; } - System.out.println("Aggregating metrics..."); - for (S3DirectoryInfo directoryInfo : metricsDirectories) { - String toolId = directoryInfo.toolId(); - String versionName = directoryInfo.versionId(); - List platforms = directoryInfo.platforms(); - String platformsString = String.join(", ", platforms); - String versionS3KeyPrefix = directoryInfo.versionS3KeyPrefix(); - - // Collect metrics for each platform, so we can calculate metrics across all platforms - List allMetrics = new ArrayList<>(); - for (String platform : platforms) { - ExecutionsRequestBody allSubmissions; - try { - allSubmissions = getExecutions(toolId, versionName, platform); - } catch (Exception e) { - LOG.error("Error aggregating metrics: Could not get all executions from directory {}", versionS3KeyPrefix, e); - continue; // Continue aggregating metrics for other directories - } - - try { - getAggregatedMetrics(allSubmissions).ifPresent(metrics -> { - extendedGa4GhApi.aggregatedMetricsPut(metrics, platform, toolId, versionName); - System.out.printf("Aggregated metrics for tool ID %s, version %s, platform %s from directory %s%n", toolId, versionName, platform, versionS3KeyPrefix); - allMetrics.add(metrics); - }); - } catch (Exception e) { - LOG.error("Error aggregating metrics: Could not put all executions from directory {}", versionS3KeyPrefix, e); - // Continue aggregating metrics for other platforms - } + LOG.info("Aggregating metrics for {} directories", metricsDirectories.size()); + metricsDirectories.stream() + .parallel() + .forEach(directoryInfo -> aggregateMetricsForDirectory(directoryInfo, extendedGa4GhApi)); + LOG.info("Completed aggregating metrics. Processed {} directories, submitted {} metrics, and skipped {} metrics", numberOfDirectoriesProcessed, numberOfMetricsSubmitted, numberOfMetricsSkipped); + } + + private void aggregateMetricsForDirectory(S3DirectoryInfo directoryInfo, ExtendedGa4GhApi extendedGa4GhApi) { + LOG.info("Processing directory {}", directoryInfo); + String toolId = directoryInfo.toolId(); + String versionName = directoryInfo.versionId(); + List platforms = directoryInfo.platforms(); + String platformsString = String.join(", ", platforms); + String versionS3KeyPrefix = directoryInfo.versionS3KeyPrefix(); + + // Collect metrics for each platform, so we can calculate metrics across all platforms + List allMetrics = new ArrayList<>(); + for (String platform : platforms) { + ExecutionsRequestBody allSubmissions; + try { + allSubmissions = getExecutions(toolId, versionName, platform); + } catch (Exception e) { + LOG.error("Error aggregating metrics: Could not get all executions from directory {}", versionS3KeyPrefix, e); + ++numberOfMetricsSkipped; + continue; // Continue aggregating metrics for other directories } - if (!allMetrics.isEmpty()) { - // Calculate metrics across all platforms by aggregating the aggregated metrics from each platform - try { - getAggregatedMetrics(new ExecutionsRequestBody().aggregatedExecutions(allMetrics)).ifPresent(metrics -> { - extendedGa4GhApi.aggregatedMetricsPut(metrics, Partner.ALL.name(), toolId, versionName); - System.out.printf("Aggregated metrics across all platforms (%s) for tool ID %s, version %s from directory %s%n", - platformsString, toolId, versionName, versionS3KeyPrefix); - allMetrics.add(metrics); - }); - } catch (Exception e) { - LOG.error("Error aggregating metrics across all platforms ({}) for tool ID {}, version {} from directory {}", platformsString, toolId, versionName, versionS3KeyPrefix, e); - // Continue aggregating metrics for other directories - } + try { + getAggregatedMetrics(allSubmissions).ifPresent(metrics -> { + extendedGa4GhApi.aggregatedMetricsPut(metrics, platform, toolId, versionName); + System.out.printf("Aggregated metrics for tool ID %s, version %s, platform %s from directory %s%n", toolId, versionName, platform, versionS3KeyPrefix); + allMetrics.add(metrics); + }); + } catch (Exception e) { + LOG.error("Error aggregating metrics: Could not put all executions from directory {}", versionS3KeyPrefix, e); + // Continue aggregating metrics for other platforms + } + } + + if (!allMetrics.isEmpty()) { + // Calculate metrics across all platforms by aggregating the aggregated metrics from each platform + try { + getAggregatedMetrics(new ExecutionsRequestBody().aggregatedExecutions(allMetrics)).ifPresent(metrics -> { + extendedGa4GhApi.aggregatedMetricsPut(metrics, Partner.ALL.name(), toolId, versionName); + System.out.printf("Aggregated metrics across all platforms (%s) for tool ID %s, version %s from directory %s%n", + platformsString, toolId, versionName, versionS3KeyPrefix); + allMetrics.add(metrics); + }); + } catch (Exception e) { + LOG.error("Error aggregating metrics across all platforms ({}) for tool ID {}, version {} from directory {}", platformsString, toolId, versionName, versionS3KeyPrefix, e); + // Continue aggregating metrics for other directories } + ++numberOfMetricsSubmitted; + } else { + ++numberOfMetricsSkipped; } - System.out.println("Completed aggregating metrics"); + ++numberOfDirectoriesProcessed; + LOG.info("Processed {} directories", numberOfDirectoriesProcessed); } /**