Skip to content

Commit

Permalink
Remove the java aggregation
Browse files Browse the repository at this point in the history
  • Loading branch information
kathy-t committed Sep 13, 2024
1 parent f219bcb commit e94b9c5
Show file tree
Hide file tree
Showing 29 changed files with 57 additions and 2,363 deletions.
10 changes: 0 additions & 10 deletions metricsaggregator/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -160,14 +160,6 @@
<groupId>software.amazon.awssdk</groupId>
<artifactId>aws-core</artifactId>
</dependency>
<dependency>
<groupId>org.javamoney.moneta</groupId>
<artifactId>moneta-core</artifactId>
</dependency>
<dependency>
<groupId>javax.money</groupId>
<artifactId>money-api</artifactId>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-csv</artifactId>
Expand Down Expand Up @@ -326,8 +318,6 @@
<usedDependency>org.slf4j:slf4j-api</usedDependency>
<usedDependency>software.amazon.awssdk:s3</usedDependency>
<usedDependency>org.glassfish.jersey.inject:jersey-hk2</usedDependency>
<usedDependency>javax.money:money-api</usedDependency>
<usedDependency>org.javamoney.moneta:moneta-core</usedDependency>
<usedDependency>ch.qos.logback:logback-classic</usedDependency>
<usedDependency>ch.qos.logback:logback-core</usedDependency>
<usedDependency>com.google.guava:guava</usedDependency>
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ public class MetricsAggregatorAthenaClient {
private final AtomicInteger numberOfVersionsSubmitted = new AtomicInteger(0);
private final AtomicInteger numberOfVersionsSkipped = new AtomicInteger(0);

public MetricsAggregatorAthenaClient(MetricsAggregatorConfig config) {
public MetricsAggregatorAthenaClient(MetricsAggregatorConfig config, boolean isDryRun) {
this.metricsBucketName = config.getS3Config().bucket();
this.athenaWorkgroup = config.getAthenaConfig().workgroup();

Expand All @@ -63,8 +63,10 @@ public MetricsAggregatorAthenaClient(MetricsAggregatorConfig config) {
this.executionStatusAggregator = new ExecutionStatusAthenaAggregator(this, tableName);
this.validationStatusAggregator = new ValidationStatusAthenaAggregator(this, tableName);

AthenaAggregator.createDatabase(databaseName, this);
AthenaAggregator.createTable(tableName, metricsBucketName, metadataApi, this);
if (!isDryRun) {
AthenaAggregator.createDatabase(databaseName, this);
AthenaAggregator.createTable(tableName, metricsBucketName, metadataApi, this);
}
}

/**
Expand All @@ -73,10 +75,10 @@ public MetricsAggregatorAthenaClient(MetricsAggregatorConfig config) {
* @param extendedGa4GhApi
* @param skipPostingToDockstore
*/
public void aggregateMetrics(List<S3DirectoryInfo> s3DirectoriesToAggregate, ExtendedGa4GhApi extendedGa4GhApi, boolean skipPostingToDockstore) {
public void aggregateMetrics(List<S3DirectoryInfo> s3DirectoriesToAggregate, ExtendedGa4GhApi extendedGa4GhApi, boolean skipPostingToDockstore, boolean isDryRun) {
// Aggregate metrics for each directory
s3DirectoriesToAggregate.stream().parallel().forEach(s3DirectoryInfo -> {
Map<String, Metrics> platformToMetrics = getAggregatedMetricsForPlatforms(s3DirectoryInfo);
Map<String, Metrics> platformToMetrics = getAggregatedMetricsForPlatforms(s3DirectoryInfo, isDryRun);
if (platformToMetrics.isEmpty()) {
LOG.error("No metrics were aggregated for tool ID: {}, version {}", s3DirectoryInfo.toolId(), s3DirectoryInfo.versionId());
numberOfVersionsSkipped.incrementAndGet();
Expand Down Expand Up @@ -143,26 +145,35 @@ public List<QueryResultRow> executeQuery(String query) throws AwsServiceExceptio
* @param s3DirectoryInfo
* @return
*/
public Map<String, Metrics> getAggregatedMetricsForPlatforms(S3DirectoryInfo s3DirectoryInfo) {
public Map<String, Metrics> getAggregatedMetricsForPlatforms(S3DirectoryInfo s3DirectoryInfo, boolean isDryRun) {
LOG.info("Aggregating metrics for directory: {}", s3DirectoryInfo.versionS3KeyPrefix());
Map<String, Metrics> platformToMetrics = new HashMap<>();
AthenaTablePartition athenaTablePartition = s3DirectoryInfo.athenaTablePartition();
try {
// Calculate metrics for runexecutions
Map<String, ExecutionStatusMetric> executionStatusMetricByPlatform = executionStatusAggregator.createMetricByPlatform(athenaTablePartition);
// Calculate metrics for validationexecutions
Map<String, ValidationStatusMetric> validationStatusMetricByPlatform = validationStatusAggregator.createMetricByPlatform(athenaTablePartition);

s3DirectoryInfo.platforms().forEach(platform -> {
ExecutionStatusMetric executionStatusMetric = executionStatusMetricByPlatform.get(platform);
ValidationStatusMetric validationStatusMetric = validationStatusMetricByPlatform.get(platform);

if (executionStatusMetric != null || validationStatusMetric != null) {
platformToMetrics.putIfAbsent(platform, new Metrics().executionStatusCount(executionStatusMetric).validationStatus(validationStatusMetric));
LOG.info("Aggregated metrics for tool ID {}, version {}, platform {} from directory {}", s3DirectoryInfo.toolId(),
s3DirectoryInfo.versionId(), platform, s3DirectoryInfo.versionS3KeyPrefix());
}
});
if (isDryRun) {
executionStatusAggregator.printQuery(athenaTablePartition);
validationStatusAggregator.printQuery(athenaTablePartition);
} else {

// Calculate metrics for runexecutions
Map<String, ExecutionStatusMetric> executionStatusMetricByPlatform = executionStatusAggregator.createMetricByPlatform(
athenaTablePartition);
// Calculate metrics for validationexecutions
Map<String, ValidationStatusMetric> validationStatusMetricByPlatform = validationStatusAggregator.createMetricByPlatform(
athenaTablePartition);

s3DirectoryInfo.platforms().forEach(platform -> {
ExecutionStatusMetric executionStatusMetric = executionStatusMetricByPlatform.get(platform);
ValidationStatusMetric validationStatusMetric = validationStatusMetricByPlatform.get(platform);

if (executionStatusMetric != null || validationStatusMetric != null) {
platformToMetrics.putIfAbsent(platform,
new Metrics().executionStatusCount(executionStatusMetric).validationStatus(validationStatusMetric));
LOG.info("Aggregated metrics for tool ID {}, version {}, platform {} from directory {}", s3DirectoryInfo.toolId(),
s3DirectoryInfo.versionId(), platform, s3DirectoryInfo.versionS3KeyPrefix());
}
});
}
} catch (Exception e) {
// Log error and continue
LOG.error("Could not aggregate metrics for tool ID {}, version {}", s3DirectoryInfo.toolId(), s3DirectoryInfo.versionId(), e);
Expand Down
Loading

0 comments on commit e94b9c5

Please sign in to comment.