From 8ea9e173450dff336cd571d857b4589a836f0919 Mon Sep 17 00:00:00 2001
From: Scott Macdonald <57190223+scmacdon@users.noreply.github.com>
Date: Fri, 13 Dec 2024 14:34:27 -0500
Subject: [PATCH] Java V2 Add the Firehose scenario (#7144)
---
.doc_gen/metadata/firehose_metadata.yaml | 24 +
javav2/example_code/firehose/README.md | 29 +-
javav2/example_code/firehose/pom.xml | 4 +
.../firehose/CreateDeliveryStream.java | 3 +-
.../com/example/firehose/DeleteStream.java | 3 +-
.../example/firehose/ListDeliveryStreams.java | 1 -
.../com/example/firehose/PutBatchRecords.java | 107 -
.../java/com/example/firehose/PutRecord.java | 74 -
.../java/com/example/firehose/StockTrade.java | 90 -
.../example/firehose/StockTradeGenerator.java | 97 -
.../firehose/scenario/FirehoseScenario.java | 222 +
.../com/example/firehose/scenario/README.md | 48 +
.../firehose/src/test/java/FirehoseTest.java | 39 +-
workflows/firehose/sample_records.json | 12752 ++++++++++++++++
14 files changed, 13111 insertions(+), 382 deletions(-)
delete mode 100644 javav2/example_code/firehose/src/main/java/com/example/firehose/PutBatchRecords.java
delete mode 100644 javav2/example_code/firehose/src/main/java/com/example/firehose/PutRecord.java
delete mode 100644 javav2/example_code/firehose/src/main/java/com/example/firehose/StockTrade.java
delete mode 100644 javav2/example_code/firehose/src/main/java/com/example/firehose/StockTradeGenerator.java
create mode 100644 javav2/example_code/firehose/src/main/java/com/example/firehose/scenario/FirehoseScenario.java
create mode 100644 javav2/example_code/firehose/src/main/java/com/example/firehose/scenario/README.md
create mode 100644 workflows/firehose/sample_records.json
diff --git a/.doc_gen/metadata/firehose_metadata.yaml b/.doc_gen/metadata/firehose_metadata.yaml
index 67034115239..f12ee6c114e 100644
--- a/.doc_gen/metadata/firehose_metadata.yaml
+++ b/.doc_gen/metadata/firehose_metadata.yaml
@@ -1,6 +1,14 @@
# zexi 0.4.0
firehose_PutRecord:
languages:
+ Java:
+ versions:
+ - sdk_version: 2
+ github: javav2/example_code/firehose
+ excerpts:
+ - description:
+ snippet_tags:
+ - firehose.java2.put_record.main
Python:
versions:
- sdk_version: 3
@@ -14,6 +22,14 @@ firehose_PutRecord:
firehose: {PutRecord}
firehose_PutRecordBatch:
languages:
+ Java:
+ versions:
+ - sdk_version: 2
+ github: javav2/example_code/firehose
+ excerpts:
+ - description:
+ snippet_tags:
+ - firehose.java2.put_batch_records.main
Rust:
versions:
- sdk_version: 1
@@ -40,6 +56,14 @@ firehose_Scenario_PutRecords:
synopsis: use &FH; to process individual and batch records.
category: Scenarios
languages:
+ Java:
+ versions:
+ - sdk_version: 2
+ github: javav2/example_code/firehose
+ excerpts:
+ - description: This example puts individual and batch records to &FH;.
+ snippet_tags:
+ - firehose.java2.scenario.main
Python:
versions:
- sdk_version: 3
diff --git a/javav2/example_code/firehose/README.md b/javav2/example_code/firehose/README.md
index 111206ed61c..d9b3ef310db 100644
--- a/javav2/example_code/firehose/README.md
+++ b/javav2/example_code/firehose/README.md
@@ -29,6 +29,21 @@ For prerequisites, see the [README](../../README.md#Prerequisites) in the `javav
+### Single actions
+
+Code excerpts that show you how to call individual service functions.
+
+- [PutRecord](src/main/java/com/example/firehose/scenario/FirehoseScenario.java#L92)
+- [PutRecordBatch](src/main/java/com/example/firehose/scenario/FirehoseScenario.java#L125)
+
+### Scenarios
+
+Code examples that show you how to accomplish a specific task by calling multiple
+functions within the same service.
+
+- [Put records to Data Firehose](src/main/java/com/example/firehose/scenario/FirehoseScenario.java)
+
+
@@ -42,6 +57,18 @@ For prerequisites, see the [README](../../README.md#Prerequisites) in the `javav
+#### Put records to Data Firehose
+
+This example shows you how to use Data Firehose to process individual and batch records.
+
+
+
+
+
+
+
+
+
### Tests
⚠ Running tests might result in charges to your AWS account.
@@ -68,4 +95,4 @@ in the `javav2` folder.
Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
-SPDX-License-Identifier: Apache-2.0
\ No newline at end of file
+SPDX-License-Identifier: Apache-2.0
diff --git a/javav2/example_code/firehose/pom.xml b/javav2/example_code/firehose/pom.xml
index 5b030d1b3fa..52f13f905ad 100644
--- a/javav2/example_code/firehose/pom.xml
+++ b/javav2/example_code/firehose/pom.xml
@@ -55,6 +55,10 @@
software.amazon.awssdk
secretsmanager
+
+ software.amazon.awssdk
+ cloudwatch
+
com.google.code.gson
gson
diff --git a/javav2/example_code/firehose/src/main/java/com/example/firehose/CreateDeliveryStream.java b/javav2/example_code/firehose/src/main/java/com/example/firehose/CreateDeliveryStream.java
index d355fff0689..2a944d04977 100644
--- a/javav2/example_code/firehose/src/main/java/com/example/firehose/CreateDeliveryStream.java
+++ b/javav2/example_code/firehose/src/main/java/com/example/firehose/CreateDeliveryStream.java
@@ -36,7 +36,7 @@ public static void main(String[] args) {
if (args.length != 3) {
System.out.println(usage);
- System.exit(1);
+ return;
}
String bucketARN = args[0];
@@ -70,7 +70,6 @@ public static void createStream(FirehoseClient firehoseClient, String bucketARN,
} catch (FirehoseException e) {
System.out.println(e.getLocalizedMessage());
- System.exit(1);
}
}
}
diff --git a/javav2/example_code/firehose/src/main/java/com/example/firehose/DeleteStream.java b/javav2/example_code/firehose/src/main/java/com/example/firehose/DeleteStream.java
index 21bca2787d7..c5d89ad47c7 100644
--- a/javav2/example_code/firehose/src/main/java/com/example/firehose/DeleteStream.java
+++ b/javav2/example_code/firehose/src/main/java/com/example/firehose/DeleteStream.java
@@ -33,7 +33,7 @@ public static void main(String[] args) {
if (args.length != 1) {
System.out.println(usage);
- System.exit(1);
+ return;
}
String streamName = args[0];
@@ -57,7 +57,6 @@ public static void delStream(FirehoseClient firehoseClient, String streamName) {
} catch (FirehoseException e) {
System.out.println(e.getLocalizedMessage());
- System.exit(1);
}
}
}
diff --git a/javav2/example_code/firehose/src/main/java/com/example/firehose/ListDeliveryStreams.java b/javav2/example_code/firehose/src/main/java/com/example/firehose/ListDeliveryStreams.java
index 0dfca2db97b..eada7f2db20 100644
--- a/javav2/example_code/firehose/src/main/java/com/example/firehose/ListDeliveryStreams.java
+++ b/javav2/example_code/firehose/src/main/java/com/example/firehose/ListDeliveryStreams.java
@@ -42,7 +42,6 @@ public static void listStreams(FirehoseClient firehoseClient) {
} catch (FirehoseException e) {
System.out.println(e.getLocalizedMessage());
- System.exit(1);
}
}
}
diff --git a/javav2/example_code/firehose/src/main/java/com/example/firehose/PutBatchRecords.java b/javav2/example_code/firehose/src/main/java/com/example/firehose/PutBatchRecords.java
deleted file mode 100644
index 63c351ba643..00000000000
--- a/javav2/example_code/firehose/src/main/java/com/example/firehose/PutBatchRecords.java
+++ /dev/null
@@ -1,107 +0,0 @@
-// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
-// SPDX-License-Identifier: Apache-2.0
-
-package com.example.firehose;
-
-// snippet-start:[firehose.java2.put_batch_records.main]
-// snippet-start:[firehose.java2.put_batch_records.import]
-import software.amazon.awssdk.regions.Region;
-import software.amazon.awssdk.services.firehose.FirehoseClient;
-import software.amazon.awssdk.services.firehose.model.Record;
-import software.amazon.awssdk.services.firehose.model.PutRecordBatchRequest;
-import software.amazon.awssdk.services.firehose.model.PutRecordBatchResponse;
-import software.amazon.awssdk.services.firehose.model.FirehoseException;
-import software.amazon.awssdk.services.firehose.model.PutRecordBatchResponseEntry;
-import software.amazon.awssdk.core.SdkBytes;
-import java.util.ArrayList;
-import java.util.List;
-// snippet-end:[firehose.java2.put_batch_records.import]
-
-/**
- * Before running this Java V2 code example, set up your development
- * environment, including your credentials.
- *
- * For more information, see the following documentation topic:
- *
- * https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/get-started.html
- */
-
-public class PutBatchRecords {
-
- public static void main(String[] args) {
- final String usage = """
-
- Usage:
- \s
-
- Where:
- streamName - The data stream name\s
- """;
-
- if (args.length != 1) {
- System.out.println(usage);
- System.exit(1);
- }
-
- String streamName = args[0];
- Region region = Region.US_WEST_2;
- FirehoseClient firehoseClient = FirehoseClient.builder()
- .region(region)
- .build();
-
- addStockTradeData(firehoseClient, streamName);
- firehoseClient.close();
- }
-
- public static void addStockTradeData(FirehoseClient firehoseClient, String streamName) {
- List recordList = new ArrayList<>();
- try {
- // Repeatedly send stock trades with a 100 milliseconds wait in between.
- StockTradeGenerator stockTradeGenerator = new StockTradeGenerator();
- int index = 100;
-
- // Populate the list with StockTrade data.
- for (int x = 0; x < index; x++) {
- StockTrade trade = stockTradeGenerator.getRandomTrade();
- byte[] bytes = trade.toJsonAsBytes();
-
- Record myRecord = Record.builder()
- .data(SdkBytes.fromByteArray(bytes))
- .build();
-
- System.out.println("Adding trade: " + trade.toString());
- recordList.add(myRecord);
- Thread.sleep(100);
- }
-
- PutRecordBatchRequest recordBatchRequest = PutRecordBatchRequest.builder()
- .deliveryStreamName(streamName)
- .records(recordList)
- .build();
-
- PutRecordBatchResponse recordResponse = firehoseClient.putRecordBatch(recordBatchRequest);
- System.out.println("The number of records added is: " + recordResponse.requestResponses().size());
-
- // Check the details of all records in this batch operation.
- String errorMsg = "";
- String errorCode = "";
- List results = recordResponse.requestResponses();
- for (PutRecordBatchResponseEntry result : results) {
-
- // Returns null if there is no error.
- errorCode = result.errorCode();
- if (errorCode == null) {
- System.out.println("Record " + result.recordId() + " was successfully added!");
- } else {
- errorMsg = result.errorMessage();
- System.out.println("Error code for record ID : " + result.recordId() + " is " + errorMsg);
- }
- }
-
- } catch (FirehoseException | InterruptedException e) {
- System.out.println(e.getLocalizedMessage());
- System.exit(1);
- }
- }
-}
-// snippet-end:[firehose.java2.put_batch_records.main]
diff --git a/javav2/example_code/firehose/src/main/java/com/example/firehose/PutRecord.java b/javav2/example_code/firehose/src/main/java/com/example/firehose/PutRecord.java
deleted file mode 100644
index a584c0ab483..00000000000
--- a/javav2/example_code/firehose/src/main/java/com/example/firehose/PutRecord.java
+++ /dev/null
@@ -1,74 +0,0 @@
-// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
-// SPDX-License-Identifier: Apache-2.0
-
-package com.example.firehose;
-
-// snippet-start:[firehose.java2.put_record.import]
-import software.amazon.awssdk.regions.Region;
-import software.amazon.awssdk.services.firehose.FirehoseClient;
-import software.amazon.awssdk.services.firehose.model.FirehoseException;
-import software.amazon.awssdk.services.firehose.model.PutRecordRequest;
-import software.amazon.awssdk.core.SdkBytes;
-import software.amazon.awssdk.services.firehose.model.Record;
-import software.amazon.awssdk.services.firehose.model.PutRecordResponse;
-// snippet-end:[firehose.java2.put_record.import]
-
-/**
- * Before running this Java V2 code example, set up your development
- * environment, including your credentials.
- *
- * For more information, see the following documentation topic:
- *
- * https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/get-started.html
- */
-public class PutRecord {
- public static void main(String[] args) {
- final String usage = """
-
- Usage:
- \s
-
- Where:
- textValue - The text used as the data to write to the data stream.\s
- streamName - The data stream name.\s
- """;
-
- if (args.length != 2) {
- System.out.println(usage);
- System.exit(1);
- }
-
- String textValue = args[0];
- String streamName = args[1];
- Region region = Region.US_WEST_2;
- FirehoseClient firehoseClient = FirehoseClient.builder()
- .region(region)
- .build();
-
- putSingleRecord(firehoseClient, textValue, streamName);
- firehoseClient.close();
- }
-
- // snippet-start:[firehose.java2.put_record.main]
- public static void putSingleRecord(FirehoseClient firehoseClient, String textValue, String streamName) {
- try {
- SdkBytes sdkBytes = SdkBytes.fromByteArray(textValue.getBytes());
- Record record = Record.builder()
- .data(sdkBytes)
- .build();
-
- PutRecordRequest recordRequest = PutRecordRequest.builder()
- .deliveryStreamName(streamName)
- .record(record)
- .build();
-
- PutRecordResponse recordResponse = firehoseClient.putRecord(recordRequest);
- System.out.println("The record ID is " + recordResponse.recordId());
-
- } catch (FirehoseException e) {
- System.out.println(e.getLocalizedMessage());
- System.exit(1);
- }
- }
-}
-// snippet-end:[firehose.java2.put_record.main]
diff --git a/javav2/example_code/firehose/src/main/java/com/example/firehose/StockTrade.java b/javav2/example_code/firehose/src/main/java/com/example/firehose/StockTrade.java
deleted file mode 100644
index 6fd851cf6f5..00000000000
--- a/javav2/example_code/firehose/src/main/java/com/example/firehose/StockTrade.java
+++ /dev/null
@@ -1,90 +0,0 @@
-// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
-// SPDX-License-Identifier: Apache-2.0
-
-package com.example.firehose;
-
-import java.io.IOException;
-import com.fasterxml.jackson.databind.DeserializationFeature;
-import com.fasterxml.jackson.databind.ObjectMapper;
-
-/**
- * Captures the key elements of a stock trade, such as the ticker symbol, price,
- * number of shares, the type of the trade (buy or sell), and an id uniquely
- * identifying
- * the trade.
- */
-public class StockTrade {
-
- private final static ObjectMapper JSON = new ObjectMapper();
- static {
- JSON.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
- }
-
- /**
- * Represents the type of the stock trade eg buy or sell.
- */
- public enum TradeType {
- BUY,
- SELL
- }
-
- private String tickerSymbol;
- private TradeType tradeType;
- private double price;
- private long quantity;
- private long id;
-
- public StockTrade() {
- }
-
- public StockTrade(String tickerSymbol, TradeType tradeType, double price, long quantity, long id) {
- this.tickerSymbol = tickerSymbol;
- this.tradeType = tradeType;
- this.price = price;
- this.quantity = quantity;
- this.id = id;
- }
-
- public String getTickerSymbol() {
- return tickerSymbol;
- }
-
- public TradeType getTradeType() {
- return tradeType;
- }
-
- public double getPrice() {
- return price;
- }
-
- public long getQuantity() {
- return quantity;
- }
-
- public long getId() {
- return id;
- }
-
- public byte[] toJsonAsBytes() {
- try {
- return JSON.writeValueAsBytes(this);
- } catch (IOException e) {
- return null;
- }
- }
-
- public static StockTrade fromJsonAsBytes(byte[] bytes) {
- try {
- return JSON.readValue(bytes, StockTrade.class);
- } catch (IOException e) {
- return null;
- }
- }
-
- @Override
- public String toString() {
- return String.format("ID %d: %s %d shares of %s for $%.02f",
- id, tradeType, quantity, tickerSymbol, price);
- }
-
-}
\ No newline at end of file
diff --git a/javav2/example_code/firehose/src/main/java/com/example/firehose/StockTradeGenerator.java b/javav2/example_code/firehose/src/main/java/com/example/firehose/StockTradeGenerator.java
deleted file mode 100644
index f9480615619..00000000000
--- a/javav2/example_code/firehose/src/main/java/com/example/firehose/StockTradeGenerator.java
+++ /dev/null
@@ -1,97 +0,0 @@
-// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
-// SPDX-License-Identifier: Apache-2.0
-
-package com.example.firehose;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Random;
-import java.util.concurrent.atomic.AtomicLong;
-
-/**
- * Generates random stock trades by picking randomly from a collection of
- * stocks, assigning a
- * random price based on the mean, and picking a random quantity for the shares.
- *
- */
-
-public class StockTradeGenerator {
-
- private static final List STOCK_PRICES = new ArrayList<>();
- static {
- STOCK_PRICES.add(new StockPrice("AAPL", 119.72));
- STOCK_PRICES.add(new StockPrice("XOM", 91.56));
- STOCK_PRICES.add(new StockPrice("GOOG", 527.83));
- STOCK_PRICES.add(new StockPrice("BRK.A", 223999.88));
- STOCK_PRICES.add(new StockPrice("MSFT", 42.36));
- STOCK_PRICES.add(new StockPrice("WFC", 54.21));
- STOCK_PRICES.add(new StockPrice("JNJ", 99.78));
- STOCK_PRICES.add(new StockPrice("WMT", 85.91));
- STOCK_PRICES.add(new StockPrice("CHL", 66.96));
- STOCK_PRICES.add(new StockPrice("GE", 24.64));
- STOCK_PRICES.add(new StockPrice("NVS", 102.46));
- STOCK_PRICES.add(new StockPrice("PG", 85.05));
- STOCK_PRICES.add(new StockPrice("JPM", 57.82));
- STOCK_PRICES.add(new StockPrice("RDS.A", 66.72));
- STOCK_PRICES.add(new StockPrice("CVX", 110.43));
- STOCK_PRICES.add(new StockPrice("PFE", 33.07));
- STOCK_PRICES.add(new StockPrice("FB", 74.44));
- STOCK_PRICES.add(new StockPrice("VZ", 49.09));
- STOCK_PRICES.add(new StockPrice("PTR", 111.08));
- STOCK_PRICES.add(new StockPrice("BUD", 120.39));
- STOCK_PRICES.add(new StockPrice("ORCL", 43.40));
- STOCK_PRICES.add(new StockPrice("KO", 41.23));
- STOCK_PRICES.add(new StockPrice("T", 34.64));
- STOCK_PRICES.add(new StockPrice("DIS", 101.73));
- STOCK_PRICES.add(new StockPrice("AMZN", 370.56));
- }
-
- /** The ratio of the deviation from the mean price **/
- private static final double MAX_DEVIATION = 0.2; // ie 20%
-
- /** The number of shares is picked randomly between 1 and the MAX_QUANTITY **/
- private static final int MAX_QUANTITY = 10000;
-
- /** Probability of trade being a sell **/
- private static final double PROBABILITY_SELL = 0.4; // ie 40%
-
- private final Random random = new Random();
- private AtomicLong id = new AtomicLong(1);
-
- /**
- * Return a random stock trade with a unique id every time.
- *
- */
- public StockTrade getRandomTrade() {
- // pick a random stock
- StockPrice stockPrice = STOCK_PRICES.get(random.nextInt(STOCK_PRICES.size()));
- // pick a random deviation between -MAX_DEVIATION and +MAX_DEVIATION
- double deviation = (random.nextDouble() - 0.5) * 2.0 * MAX_DEVIATION;
- // set the price using the deviation and mean price
- double price = stockPrice.price * (1 + deviation);
- // round price to 2 decimal places
- price = Math.round(price * 100.0) / 100.0;
-
- // set the trade type to buy or sell depending on the probability of sell
- StockTrade.TradeType tradeType = StockTrade.TradeType.BUY;
- if (random.nextDouble() < PROBABILITY_SELL) {
- tradeType = StockTrade.TradeType.SELL;
- }
-
- // randomly pick a quantity of shares
- long quantity = random.nextInt(MAX_QUANTITY) + 1; // add 1 because nextInt() will return between 0 (inclusive)
- // and MAX_QUANTITY (exclusive). we want at least 1 share.
-
- return new StockTrade(stockPrice.tickerSymbol, tradeType, price, quantity, id.getAndIncrement());
- }
-
- private static class StockPrice {
- String tickerSymbol;
- double price;
-
- StockPrice(String tickerSymbol, double price) {
- this.tickerSymbol = tickerSymbol;
- this.price = price;
- }
- }
-}
diff --git a/javav2/example_code/firehose/src/main/java/com/example/firehose/scenario/FirehoseScenario.java b/javav2/example_code/firehose/src/main/java/com/example/firehose/scenario/FirehoseScenario.java
new file mode 100644
index 00000000000..56ec8e85625
--- /dev/null
+++ b/javav2/example_code/firehose/src/main/java/com/example/firehose/scenario/FirehoseScenario.java
@@ -0,0 +1,222 @@
+// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
+// SPDX-License-Identifier: Apache-2.0
+
+package com.example.firehose.scenario;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import software.amazon.awssdk.core.SdkBytes;
+import software.amazon.awssdk.services.cloudwatch.CloudWatchClient;
+import software.amazon.awssdk.services.cloudwatch.model.*;
+import software.amazon.awssdk.services.firehose.FirehoseClient;
+import software.amazon.awssdk.services.firehose.model.*;
+import software.amazon.awssdk.services.firehose.model.Record;
+
+import java.io.InputStream;
+import java.nio.charset.StandardCharsets;
+import java.time.Instant;
+import java.util.*;
+import java.util.stream.Collectors;
+// snippet-start:[firehose.java2.scenario.main]
+/**
+ * Amazon Firehose Scenario example using Java V2 SDK.
+ *
+ * Demonstrates individual and batch record processing,
+ * and monitoring Firehose delivery stream metrics.
+ */
+public class FirehoseScenario {
+
+ private static FirehoseClient firehoseClient;
+ private static CloudWatchClient cloudWatchClient;
+
+ public static void main(String[] args) {
+ final String usage = """
+ Usage:
+
+ Where:
+ deliveryStreamName - The Firehose delivery stream name.
+ """;
+
+ if (args.length != 1) {
+ System.out.println(usage);
+ return;
+ }
+
+ String deliveryStreamName = args[0];
+
+ try {
+ // Read and parse sample data.
+ String jsonContent = readJsonFile("sample_records.json");
+ ObjectMapper objectMapper = new ObjectMapper();
+ List