Skip to content

Commit

Permalink
Aggregate metrics in parallel
Browse files Browse the repository at this point in the history
  • Loading branch information
kathy-t committed Dec 14, 2023
1 parent c0277ac commit fdb4c75
Showing 1 changed file with 59 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,9 @@ public class MetricsAggregatorS3Client {

private static final Logger LOG = LoggerFactory.getLogger(MetricsAggregatorS3Client.class);
private static final Gson GSON = new Gson();
private int numberOfDirectoriesProcessed = 0;
private int numberOfMetricsSubmitted = 0;
private int numberOfMetricsSkipped = 0;

private final String bucketName;

Expand All @@ -67,60 +70,72 @@ public MetricsAggregatorS3Client(String bucketName, String s3EndpointOverride) t
}

public void aggregateMetrics(ExtendedGa4GhApi extendedGa4GhApi) {
LOG.info("Getting directories to process...");
List<S3DirectoryInfo> metricsDirectories = getDirectories();

if (metricsDirectories.isEmpty()) {
System.out.println("No directories found to aggregate metrics");
LOG.info("No directories found to aggregate metrics");
return;
}

System.out.println("Aggregating metrics...");
for (S3DirectoryInfo directoryInfo : metricsDirectories) {
String toolId = directoryInfo.toolId();
String versionName = directoryInfo.versionId();
List<String> platforms = directoryInfo.platforms();
String platformsString = String.join(", ", platforms);
String versionS3KeyPrefix = directoryInfo.versionS3KeyPrefix();

// Collect metrics for each platform, so we can calculate metrics across all platforms
List<Metrics> allMetrics = new ArrayList<>();
for (String platform : platforms) {
ExecutionsRequestBody allSubmissions;
try {
allSubmissions = getExecutions(toolId, versionName, platform);
} catch (Exception e) {
LOG.error("Error aggregating metrics: Could not get all executions from directory {}", versionS3KeyPrefix, e);
continue; // Continue aggregating metrics for other directories
}

try {
getAggregatedMetrics(allSubmissions).ifPresent(metrics -> {
extendedGa4GhApi.aggregatedMetricsPut(metrics, platform, toolId, versionName);
System.out.printf("Aggregated metrics for tool ID %s, version %s, platform %s from directory %s%n", toolId, versionName, platform, versionS3KeyPrefix);
allMetrics.add(metrics);
});
} catch (Exception e) {
LOG.error("Error aggregating metrics: Could not put all executions from directory {}", versionS3KeyPrefix, e);
// Continue aggregating metrics for other platforms
}
LOG.info("Aggregating metrics for {} directories", metricsDirectories.size());
metricsDirectories.stream()
.parallel()
.forEach(directoryInfo -> aggregateMetricsForDirectory(directoryInfo, extendedGa4GhApi));
LOG.info("Completed aggregating metrics. Processed {} directories, submitted {} metrics, and skipped {} metrics", numberOfDirectoriesProcessed, numberOfMetricsSubmitted, numberOfMetricsSkipped);
}

private void aggregateMetricsForDirectory(S3DirectoryInfo directoryInfo, ExtendedGa4GhApi extendedGa4GhApi) {
LOG.info("Processing directory {}", directoryInfo);
String toolId = directoryInfo.toolId();
String versionName = directoryInfo.versionId();
List<String> platforms = directoryInfo.platforms();
String platformsString = String.join(", ", platforms);
String versionS3KeyPrefix = directoryInfo.versionS3KeyPrefix();

// Collect metrics for each platform, so we can calculate metrics across all platforms
List<Metrics> allMetrics = new ArrayList<>();
for (String platform : platforms) {
ExecutionsRequestBody allSubmissions;
try {
allSubmissions = getExecutions(toolId, versionName, platform);
} catch (Exception e) {
LOG.error("Error aggregating metrics: Could not get all executions from directory {}", versionS3KeyPrefix, e);
++numberOfMetricsSkipped;
continue; // Continue aggregating metrics for other directories
}

if (!allMetrics.isEmpty()) {
// Calculate metrics across all platforms by aggregating the aggregated metrics from each platform
try {
getAggregatedMetrics(new ExecutionsRequestBody().aggregatedExecutions(allMetrics)).ifPresent(metrics -> {
extendedGa4GhApi.aggregatedMetricsPut(metrics, Partner.ALL.name(), toolId, versionName);
System.out.printf("Aggregated metrics across all platforms (%s) for tool ID %s, version %s from directory %s%n",
platformsString, toolId, versionName, versionS3KeyPrefix);
allMetrics.add(metrics);
});
} catch (Exception e) {
LOG.error("Error aggregating metrics across all platforms ({}) for tool ID {}, version {} from directory {}", platformsString, toolId, versionName, versionS3KeyPrefix, e);
// Continue aggregating metrics for other directories
}
try {
getAggregatedMetrics(allSubmissions).ifPresent(metrics -> {
extendedGa4GhApi.aggregatedMetricsPut(metrics, platform, toolId, versionName);
System.out.printf("Aggregated metrics for tool ID %s, version %s, platform %s from directory %s%n", toolId, versionName, platform, versionS3KeyPrefix);
allMetrics.add(metrics);
});
} catch (Exception e) {
LOG.error("Error aggregating metrics: Could not put all executions from directory {}", versionS3KeyPrefix, e);
// Continue aggregating metrics for other platforms
}
}

if (!allMetrics.isEmpty()) {
// Calculate metrics across all platforms by aggregating the aggregated metrics from each platform
try {
getAggregatedMetrics(new ExecutionsRequestBody().aggregatedExecutions(allMetrics)).ifPresent(metrics -> {
extendedGa4GhApi.aggregatedMetricsPut(metrics, Partner.ALL.name(), toolId, versionName);
System.out.printf("Aggregated metrics across all platforms (%s) for tool ID %s, version %s from directory %s%n",
platformsString, toolId, versionName, versionS3KeyPrefix);
allMetrics.add(metrics);
});
} catch (Exception e) {
LOG.error("Error aggregating metrics across all platforms ({}) for tool ID {}, version {} from directory {}", platformsString, toolId, versionName, versionS3KeyPrefix, e);
// Continue aggregating metrics for other directories
}
++numberOfMetricsSubmitted;
} else {
++numberOfMetricsSkipped;
}
System.out.println("Completed aggregating metrics");
++numberOfDirectoriesProcessed;
LOG.info("Processed {} directories", numberOfDirectoriesProcessed);
}

/**
Expand Down

0 comments on commit fdb4c75

Please sign in to comment.