Skip to content

Commit

Permalink
rolled in review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
scmacdon committed Dec 13, 2024
1 parent 029f216 commit 6bafaf9
Show file tree
Hide file tree
Showing 3 changed files with 53 additions and 12,762 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,20 @@ public class FirehoseScenario {
private static String deliveryStreamName;

public static void main(String[] args) {
firehoseClient = FirehoseClient.builder().region(Region.US_EAST_1).build();
cloudWatchClient = CloudWatchClient.builder().region(Region.US_EAST_1).build();
final String usage = """
// Replace with your region and delivery stream name
deliveryStreamName = "stream35";
Usage:
<deliveryStreamName> \s
Where:
deliveryStreamName - The data stream name.\s
""";

if (args.length != 1) {
System.out.println(usage);
return;
}
deliveryStreamName = args[0];

/*
See the Readme in the scenario folder for information about the sample_records.json file.
Expand All @@ -56,7 +65,7 @@ public static void main(String[] args) {
new TypeReference<>() {}
);

// Process individual records
// Process individual records.
System.out.println("Processing individual records...");
sampleData.subList(0, 100).forEach(record -> {
try {
Expand All @@ -70,15 +79,42 @@ public static void main(String[] args) {

// Process batch records.
System.out.println("Processing batch records...");
putRecordBatch(sampleData.subList(100, 200), 50);
putRecordBatch(sampleData.subList(100, sampleData.size()), 50);
monitorMetrics();
} catch (Exception e) {
System.out.println("Error processing records: " + e.getMessage());
} finally {
closeClients();
}

System.out.println("This concludes the AWS Firehose scenario...");
System.out.println("This concludes the Amazon Firehose scenario...");
}

/**
* Retrieves a singleton instance of the FirehoseClient.
*
* <p>If the {@code firehoseClient} instance is null, it is created using the
* {@code FirehoseClient.create()} method. Otherwise, the existing instance is returned.</p>
*
* @return the FirehoseClient instance
*/
private static FirehoseClient getFirehoseClient() {
if (firehoseClient == null) {
firehoseClient = FirehoseClient.create();
}
return firehoseClient;
}

/**
* Retrieves a singleton instance of the cloudWatchClient.
*
* @return the CloudWatchClient instance
*/
private static CloudWatchClient getCloudWatchClient() {
if (cloudWatchClient == null) {
cloudWatchClient = CloudWatchClient.create();
}
return cloudWatchClient;
}

/**
Expand All @@ -102,7 +138,7 @@ public static void putRecord(Map<String, Object> record) {
.record(firehoseRecord)
.build();

firehoseClient.putRecord(putRecordRequest);
getFirehoseClient().putRecord(putRecordRequest);
System.out.println("Record sent successfully: " + jsonRecord);
} catch (Exception e) {
System.out.println("Failed to send record. Error: " + e.getMessage());
Expand Down Expand Up @@ -140,7 +176,7 @@ public static void putRecordBatch(List<Map<String, Object>> records, int batchSi
.records(batchRecords)
.build();

PutRecordBatchResponse response = firehoseClient.putRecordBatch(request);
PutRecordBatchResponse response = getFirehoseClient().putRecordBatch(request);

if (response.failedPutCount() > 0) {
System.out.println("Failed to send " + response.failedPutCount() + " records in batch of " + batchRecords.size());
Expand Down Expand Up @@ -198,7 +234,7 @@ private static void monitorMetric(String metricName, Instant startTime, Instant
.statistics(Statistic.SUM)
.build();

GetMetricStatisticsResponse response = cloudWatchClient.getMetricStatistics(request);
GetMetricStatisticsResponse response = getCloudWatchClient().getMetricStatistics(request);
double totalSum = response.datapoints().stream()
.mapToDouble(Datapoint::sum)
.sum();
Expand Down
Loading

0 comments on commit 6bafaf9

Please sign in to comment.