From fcf122f3a0c325a4ec896e38aec5e4f15d577e33 Mon Sep 17 00:00:00 2001 From: smirnovaae Date: Thu, 14 Mar 2024 12:05:43 -0700 Subject: [PATCH 1/7] multi part upload --- attribution-data-file-share/build.gradle | 1 + .../AttributionDataShareHandler.java | 36 +++++++++++++++ .../AttributionDataShareHelper.java | 45 +++++++++++++++++++ 3 files changed, 82 insertions(+) diff --git a/attribution-data-file-share/build.gradle b/attribution-data-file-share/build.gradle index 37312ec..25cb490 100644 --- a/attribution-data-file-share/build.gradle +++ b/attribution-data-file-share/build.gradle @@ -13,6 +13,7 @@ repositories { dependencies { implementation 'com.amazonaws:aws-lambda-java-core:1.2.2' implementation 'com.amazonaws:aws-java-sdk-s3:1.12.529' + implementation 'software.amazon.awssdk:s3-transfer-manager:2.25.7' implementation 'org.postgresql:postgresql:42.7.2' implementation 'software.amazon.awssdk:s3:2.21.7' implementation 'software.amazon.awssdk:ssm:2.25.7' diff --git a/attribution-data-file-share/src/main/java/gov/cms/ab2d/attributionDataShare/AttributionDataShareHandler.java b/attribution-data-file-share/src/main/java/gov/cms/ab2d/attributionDataShare/AttributionDataShareHandler.java index 81a21c1..b798eb7 100644 --- a/attribution-data-file-share/src/main/java/gov/cms/ab2d/attributionDataShare/AttributionDataShareHandler.java +++ b/attribution-data-file-share/src/main/java/gov/cms/ab2d/attributionDataShare/AttributionDataShareHandler.java @@ -4,6 +4,7 @@ import com.amazonaws.services.lambda.runtime.LambdaLogger; import com.amazonaws.services.lambda.runtime.RequestStreamHandler; import gov.cms.ab2d.lambdalibs.lib.FileUtil; +import software.amazon.awssdk.services.s3.S3AsyncClient; import software.amazon.awssdk.services.s3.S3Client; import software.amazon.awssdk.services.sts.StsClient; import software.amazon.awssdk.services.sts.auth.StsAssumeRoleCredentialsProvider; @@ -40,6 +41,7 @@ public void handleRequest(InputStream inputStream, OutputStream outputStream, Co helper.copyDataToFile(dbConnection); helper.writeFileToFinalDestination(getS3Client(ENDPOINT, parameterStore)); + helper.mtpUpload(getAsyncS3Client(ENDPOINT, parameterStore)); } catch (NullPointerException | URISyntaxException | SQLException ex) { throwAttributionDataShareException(logger, ex); @@ -76,6 +78,40 @@ public S3Client getS3Client(String endpoint, AttributionParameterStore parameter } return client.build(); } + + public S3AsyncClient getAsyncS3Client(String endpoint, AttributionParameterStore parameterStore) throws URISyntaxException { + var client = S3AsyncClient.crtCreate(); + + + if (endpoint.equals(ENDPOINT)) { + var stsClient = StsClient + .builder() + .region(S3_REGION) + .build(); + + var request = AssumeRoleRequest + .builder() + .roleArn(parameterStore.getRole()) + .roleSessionName("roleSessionName") + .build(); + + var credentials = StsAssumeRoleCredentialsProvider + .builder() + .stsClient(stsClient) + .refreshRequest(request) + .build(); + + client = + S3AsyncClient.crtBuilder() + .credentialsProvider(credentials) + .region(S3_REGION) + .targetThroughputInGbps(20.0) + .minimumPartSizeInBytes(8 * 1025 * 1024L) + .build(); + } + return client; + } + AttributionDataShareHelper helperInit(String fileName, String fileFullPath, LambdaLogger logger) { return new AttributionDataShareHelper(fileName, fileFullPath, logger); } diff --git a/attribution-data-file-share/src/main/java/gov/cms/ab2d/attributionDataShare/AttributionDataShareHelper.java b/attribution-data-file-share/src/main/java/gov/cms/ab2d/attributionDataShare/AttributionDataShareHelper.java index b05f25e..f689e1d 100644 --- a/attribution-data-file-share/src/main/java/gov/cms/ab2d/attributionDataShare/AttributionDataShareHelper.java +++ b/attribution-data-file-share/src/main/java/gov/cms/ab2d/attributionDataShare/AttributionDataShareHelper.java @@ -3,13 +3,20 @@ import com.amazonaws.services.lambda.runtime.LambdaLogger; import com.amazonaws.services.s3.model.AmazonS3Exception; import software.amazon.awssdk.core.sync.RequestBody; +import software.amazon.awssdk.services.s3.S3AsyncClient; import software.amazon.awssdk.services.s3.S3Client; import software.amazon.awssdk.services.s3.model.PutObjectRequest; +import software.amazon.awssdk.transfer.s3.S3TransferManager; +import software.amazon.awssdk.transfer.s3.model.CompletedFileUpload; +import software.amazon.awssdk.transfer.s3.model.FileUpload; +import software.amazon.awssdk.transfer.s3.model.UploadFileRequest; +import software.amazon.awssdk.transfer.s3.progress.LoggingTransferListener; import java.io.BufferedWriter; import java.io.File; import java.io.FileWriter; import java.io.IOException; +import java.nio.file.Paths; import java.sql.*; import java.text.SimpleDateFormat; import java.util.Date; @@ -31,7 +38,13 @@ void copyDataToFile(Connection connection) { String date = new SimpleDateFormat(EFFECTIVE_DATE_PATTERN).format(new Date()); try (var stmt = connection.createStatement(); var writer = new BufferedWriter(new FileWriter(fileFullPath, true))) { + long startSelect = System.currentTimeMillis(); var rs = getExecuteQuery(stmt); + long finishSelect = System.currentTimeMillis(); + + logger.log("Select TIME ms: ---------- " + (finishSelect - startSelect)); + + long startWrite = System.currentTimeMillis(); writer.write(FIRST_LINE + date); writer.newLine(); long records = 0; @@ -42,6 +55,9 @@ void copyDataToFile(Connection connection) { records++; } writer.write(LAST_LINE + date + String.format("%010d", records)); + + long finishWrite = System.currentTimeMillis(); + logger.log("Write TIME ms: ---------- " + (finishWrite - startWrite)); } catch (SQLException | IOException ex) { String errorMessage = "An error occurred while exporting data to a file. "; logger.log(errorMessage + ex.getMessage()); @@ -64,6 +80,8 @@ String getResponseLine(String currentMbi, Timestamp effectiveDate, Boolean optOu } void writeFileToFinalDestination(S3Client s3Client) { + logger.log("filename " + fileName); + long startUpload = System.currentTimeMillis(); try { var objectRequest = PutObjectRequest.builder() .bucket(getBucketName()) @@ -76,6 +94,33 @@ void writeFileToFinalDestination(S3Client s3Client) { logger.log(errorMessage + ex.getMessage()); throw new AttributionDataShareException(errorMessage, ex); } + + long finishUpload = System.currentTimeMillis(); + logger.log("Upload TIME ms: ---------- " + (finishUpload - startUpload)); + } + + String mtpUpload(S3AsyncClient s3AsyncClient) { + String currentDate = new SimpleDateFormat(REQ_FILE_NAME_PATTERN).format(new Date()); + var key = REQ_FILE_NAME + currentDate; + logger.log("MTPU KEY " + key); + long startUpload = System.currentTimeMillis(); + S3TransferManager transferManager = S3TransferManager.builder() + .s3Client(s3AsyncClient) + .build(); + + UploadFileRequest uploadFileRequest = UploadFileRequest.builder() + .putObjectRequest(b -> b.bucket(getBucketName()).key(getUploadPath() + key)) + .addTransferListener(LoggingTransferListener.create()) + .source(Paths.get(fileFullPath)) + .build(); + + FileUpload fileUpload = transferManager.uploadFile(uploadFileRequest); + + CompletedFileUpload uploadResult = fileUpload.completionFuture().join(); + + long finishUpload = System.currentTimeMillis(); + logger.log("Multipart Upload TIME ms: ---------- " + (finishUpload - startUpload)); + return uploadResult.response().eTag(); } public String getBucketName() { From 0e2d575199c00e09fb836fe6408142b998ee1c1a Mon Sep 17 00:00:00 2001 From: smirnovaae Date: Thu, 14 Mar 2024 12:39:13 -0700 Subject: [PATCH 2/7] multi part upload --- attribution-data-file-share/build.gradle | 1 + 1 file changed, 1 insertion(+) diff --git a/attribution-data-file-share/build.gradle b/attribution-data-file-share/build.gradle index 25cb490..dea8149 100644 --- a/attribution-data-file-share/build.gradle +++ b/attribution-data-file-share/build.gradle @@ -14,6 +14,7 @@ dependencies { implementation 'com.amazonaws:aws-lambda-java-core:1.2.2' implementation 'com.amazonaws:aws-java-sdk-s3:1.12.529' implementation 'software.amazon.awssdk:s3-transfer-manager:2.25.7' + implementation 'software.amazon.awssdk.crt:aws-crt:0.29.11' implementation 'org.postgresql:postgresql:42.7.2' implementation 'software.amazon.awssdk:s3:2.21.7' implementation 'software.amazon.awssdk:ssm:2.25.7' From 0cb505dd852dc114c598f7812b0e8332e33f5382 Mon Sep 17 00:00:00 2001 From: smirnovaae Date: Thu, 14 Mar 2024 13:10:56 -0700 Subject: [PATCH 3/7] multi part upload --- .../AttributionDataShareHandler.java | 81 +++++++++++------- .../cms/ab2d/attributionDataShare/Utils.java | 84 +++++++++++++++++++ .../AttributionDataShareHandlerTest.java | 8 +- 3 files changed, 138 insertions(+), 35 deletions(-) create mode 100644 attribution-data-file-share/src/main/java/gov/cms/ab2d/attributionDataShare/Utils.java diff --git a/attribution-data-file-share/src/main/java/gov/cms/ab2d/attributionDataShare/AttributionDataShareHandler.java b/attribution-data-file-share/src/main/java/gov/cms/ab2d/attributionDataShare/AttributionDataShareHandler.java index b798eb7..c9a5f1a 100644 --- a/attribution-data-file-share/src/main/java/gov/cms/ab2d/attributionDataShare/AttributionDataShareHandler.java +++ b/attribution-data-file-share/src/main/java/gov/cms/ab2d/attributionDataShare/AttributionDataShareHandler.java @@ -10,9 +10,7 @@ import software.amazon.awssdk.services.sts.auth.StsAssumeRoleCredentialsProvider; import software.amazon.awssdk.services.sts.model.AssumeRoleRequest; -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; +import java.io.*; import java.net.URI; import java.net.URISyntaxException; import java.nio.file.Paths; @@ -20,6 +18,9 @@ import java.sql.SQLException; import java.text.SimpleDateFormat; import java.util.Date; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import static gov.cms.ab2d.attributionDataShare.AttributionDataShareConstants.*; @@ -28,6 +29,20 @@ public class AttributionDataShareHandler implements RequestStreamHandler { // Writes out a file to the FILE_PATH. // I.E: "P.AB2D.NGD.REQ.D240209.T1122001" + String select1 = "SELECT mbi,effective_date,opt_out_flag FROM (\n" + + " SELECT *, ROW_NUMBER() OVER (ORDER BY mbi DESC) AS row_num\n" + + " FROM current_mbi\n" + + ") subquery\n" + + "WHERE row_num <= 10000"; + + String select2 = "SELECT mbi,effective_date,opt_out_flag FROM (\n" + + " SELECT *, ROW_NUMBER() OVER (ORDER BY mbi DESC) AS row_num\n" + + " FROM current_mbi\n" + + ") subquery\n" + + "WHERE row_num > 10000"; + + private static BufferedWriter bufferedWriter; + public void handleRequest(InputStream inputStream, OutputStream outputStream, Context context) throws IOException { LambdaLogger logger = context.getLogger(); logger.log("AttributionDataShare Lambda is started"); @@ -37,48 +52,52 @@ public void handleRequest(InputStream inputStream, OutputStream outputStream, Co String fileFullPath = FILE_PATH + fileName; var parameterStore = AttributionParameterStore.getParameterStore(); AttributionDataShareHelper helper = helperInit(fileName, fileFullPath, logger); + + int threadCount = 2; + ExecutorService executorService = Executors.newFixedThreadPool(threadCount); + CountDownLatch latch = new CountDownLatch(threadCount); + try (var dbConnection = DriverManager.getConnection(parameterStore.getDbHost(), parameterStore.getDbUser(), parameterStore.getDbPassword())){ + long startSelect = System.currentTimeMillis(); + + executorService.execute(new Utils(fileFullPath, select1, dbConnection, getWriter(fileFullPath), latch, logger)); + executorService.execute(new Utils(fileFullPath, select2, dbConnection, getWriter(fileFullPath), latch, logger)); + + latch.await(); + + long finishSelect = System.currentTimeMillis(); - helper.copyDataToFile(dbConnection); - helper.writeFileToFinalDestination(getS3Client(ENDPOINT, parameterStore)); - helper.mtpUpload(getAsyncS3Client(ENDPOINT, parameterStore)); + logger.log("Total Select TIME ms: ---------- " + (finishSelect - startSelect)); + // helper.mtpUpload(getAsyncS3Client(ENDPOINT, parameterStore)); - } catch (NullPointerException | URISyntaxException | SQLException ex) { + } catch (NullPointerException | SQLException ex) { throwAttributionDataShareException(logger, ex); + } catch (InterruptedException e) { + throw new RuntimeException(e); } finally { FileUtil.deleteDirectoryRecursion(Paths.get(fileFullPath)); logger.log("AttributionDataShare Lambda is completed"); } } - public S3Client getS3Client(String endpoint, AttributionParameterStore parameterStore) throws URISyntaxException { - var client = S3Client.builder() - .region(S3_REGION) - .endpointOverride(new URI(endpoint)); + private static synchronized BufferedWriter getWriter(String fileFullPath) + { + try{ + if( bufferedWriter == null ) + { + bufferedWriter = new BufferedWriter(new FileWriter(fileFullPath, true)); + } - if (endpoint.equals(ENDPOINT)) { - var stsClient = StsClient - .builder() - .region(S3_REGION) - .build(); - - var request = AssumeRoleRequest - .builder() - .roleArn(parameterStore.getRole()) - .roleSessionName("roleSessionName") - .build(); - - var credentials = StsAssumeRoleCredentialsProvider - .builder() - .stsClient(stsClient) - .refreshRequest(request) - .build(); - - client.credentialsProvider(credentials); + return bufferedWriter; + } + catch(Exception e) + { + throw new RuntimeException(e); } - return client.build(); } + + public S3AsyncClient getAsyncS3Client(String endpoint, AttributionParameterStore parameterStore) throws URISyntaxException { var client = S3AsyncClient.crtCreate(); diff --git a/attribution-data-file-share/src/main/java/gov/cms/ab2d/attributionDataShare/Utils.java b/attribution-data-file-share/src/main/java/gov/cms/ab2d/attributionDataShare/Utils.java new file mode 100644 index 0000000..96670ca --- /dev/null +++ b/attribution-data-file-share/src/main/java/gov/cms/ab2d/attributionDataShare/Utils.java @@ -0,0 +1,84 @@ +package gov.cms.ab2d.attributionDataShare; + +import com.amazonaws.services.lambda.runtime.LambdaLogger; + +import java.io.BufferedWriter; +import java.io.FileWriter; +import java.io.IOException; +import java.sql.*; +import java.text.SimpleDateFormat; +import java.util.Date; +import java.util.concurrent.CountDownLatch; + +import static gov.cms.ab2d.attributionDataShare.AttributionDataShareConstants.*; + +public class Utils implements Runnable{ + String fileFullPath; + String select; + Connection connection; + BufferedWriter writer; + LambdaLogger logger; + private final CountDownLatch latch; + + public Utils(String fileFullPath, String select, Connection connection, BufferedWriter writer, CountDownLatch latch, LambdaLogger logger) { + this.fileFullPath = fileFullPath; + this.select = select; + this.connection = connection; + this.writer = writer; + this.logger = logger; + this.latch = latch; + } + + @Override + public void run() { + String date = new SimpleDateFormat(EFFECTIVE_DATE_PATTERN).format(new Date()); + try (var stmt = connection.createStatement()){ + long startSelect = System.currentTimeMillis(); + var rs = getExecuteQuery(select, stmt); + long finishSelect = System.currentTimeMillis(); + + logger.log("Select TIME ms: ---------- " + (finishSelect - startSelect)); + + long startWrite = System.currentTimeMillis(); + writer.write(FIRST_LINE + date); + writer.newLine(); + long records = 0; + while (rs.next()) { + var line = getResponseLine(rs.getString(1), rs.getTimestamp(2), rs.getBoolean(3)); + writer.write(line); + writer.newLine(); + records++; + } + writer.write(LAST_LINE + date + String.format("%010d", records)); + + long finishWrite = System.currentTimeMillis(); + logger.log("Write TIME ms: ---------- " + (finishWrite - startWrite)); + } catch (SQLException | IOException ex) { + String errorMessage = "An error occurred while exporting data to a file. "; + logger.log(errorMessage + ex.getMessage()); + throw new AttributionDataShareException(errorMessage, ex); + } + finally { + latch.countDown(); + } + } + + String getResponseLine(String currentMbi, Timestamp effectiveDate, Boolean optOutFlag) { + var result = new StringBuilder(); + result.append(currentMbi); + // Adding spaces to the end of a string to achieve the required position index + if (currentMbi.length() < CURRENT_MBI_LENGTH) + result.append(" ".repeat(Math.max(0, CURRENT_MBI_LENGTH - currentMbi.length()))); + + if (effectiveDate != null) { + result.append(new SimpleDateFormat(EFFECTIVE_DATE_PATTERN).format(effectiveDate)); + result.append((optOutFlag) ? 'Y' : 'N'); + } + return result.toString(); + } + + static ResultSet getExecuteQuery(String select, Statement statement) throws SQLException { + return statement.executeQuery(select); + } + +} diff --git a/attribution-data-file-share/src/test/java/gov/cms/ab2d/attributionDataShare/AttributionDataShareHandlerTest.java b/attribution-data-file-share/src/test/java/gov/cms/ab2d/attributionDataShare/AttributionDataShareHandlerTest.java index 56231de..6869c65 100644 --- a/attribution-data-file-share/src/test/java/gov/cms/ab2d/attributionDataShare/AttributionDataShareHandlerTest.java +++ b/attribution-data-file-share/src/test/java/gov/cms/ab2d/attributionDataShare/AttributionDataShareHandlerTest.java @@ -49,8 +49,8 @@ void attributionDataShareExceptionTest() { assertThrows(AttributionDataShareException.class, () -> handler.throwAttributionDataShareException(LOGGER, ex)); } - @Test - void getS3ClientTest() throws URISyntaxException { - assertNotNull(handler.getS3Client(TEST_ENDPOINT, parameterStore)); - } +// @Test +// void getS3ClientTest() throws URISyntaxException { +// assertNotNull(handler.getS3Client(TEST_ENDPOINT, parameterStore)); +// } } From ba3b3506f200b106c46dca959662d77a638f660b Mon Sep 17 00:00:00 2001 From: smirnovaae Date: Thu, 14 Mar 2024 13:22:44 -0700 Subject: [PATCH 4/7] multi part upload --- .../AttributionDataShareHandler.java | 29 +++++++++++++++++-- 1 file changed, 26 insertions(+), 3 deletions(-) diff --git a/attribution-data-file-share/src/main/java/gov/cms/ab2d/attributionDataShare/AttributionDataShareHandler.java b/attribution-data-file-share/src/main/java/gov/cms/ab2d/attributionDataShare/AttributionDataShareHandler.java index c9a5f1a..25edfd6 100644 --- a/attribution-data-file-share/src/main/java/gov/cms/ab2d/attributionDataShare/AttributionDataShareHandler.java +++ b/attribution-data-file-share/src/main/java/gov/cms/ab2d/attributionDataShare/AttributionDataShareHandler.java @@ -33,13 +33,25 @@ public class AttributionDataShareHandler implements RequestStreamHandler { " SELECT *, ROW_NUMBER() OVER (ORDER BY mbi DESC) AS row_num\n" + " FROM current_mbi\n" + ") subquery\n" + - "WHERE row_num <= 10000"; + "WHERE row_num <= 5000"; String select2 = "SELECT mbi,effective_date,opt_out_flag FROM (\n" + " SELECT *, ROW_NUMBER() OVER (ORDER BY mbi DESC) AS row_num\n" + " FROM current_mbi\n" + ") subquery\n" + - "WHERE row_num > 10000"; + "WHERE row_num > 5000 And row_num <= 10000"; + + String select3 = "SELECT mbi,effective_date,opt_out_flag FROM (\n" + + " SELECT *, ROW_NUMBER() OVER (ORDER BY mbi DESC) AS row_num\n" + + " FROM current_mbi\n" + + ") subquery\n" + + "WHERE row_num > 10000 And row_num <= 20000"; + + String select4 = "SELECT mbi,effective_date,opt_out_flag FROM (\n" + + " SELECT *, ROW_NUMBER() OVER (ORDER BY mbi DESC) AS row_num\n" + + " FROM current_mbi\n" + + ") subquery\n" + + "WHERE row_num > 20000"; private static BufferedWriter bufferedWriter; @@ -53,7 +65,7 @@ public void handleRequest(InputStream inputStream, OutputStream outputStream, Co var parameterStore = AttributionParameterStore.getParameterStore(); AttributionDataShareHelper helper = helperInit(fileName, fileFullPath, logger); - int threadCount = 2; + int threadCount = 4; ExecutorService executorService = Executors.newFixedThreadPool(threadCount); CountDownLatch latch = new CountDownLatch(threadCount); @@ -62,6 +74,8 @@ public void handleRequest(InputStream inputStream, OutputStream outputStream, Co executorService.execute(new Utils(fileFullPath, select1, dbConnection, getWriter(fileFullPath), latch, logger)); executorService.execute(new Utils(fileFullPath, select2, dbConnection, getWriter(fileFullPath), latch, logger)); + executorService.execute(new Utils(fileFullPath, select3, dbConnection, getWriter(fileFullPath), latch, logger)); + executorService.execute(new Utils(fileFullPath, select4, dbConnection, getWriter(fileFullPath), latch, logger)); latch.await(); @@ -70,6 +84,15 @@ public void handleRequest(InputStream inputStream, OutputStream outputStream, Co logger.log("Total Select TIME ms: ---------- " + (finishSelect - startSelect)); // helper.mtpUpload(getAsyncS3Client(ENDPOINT, parameterStore)); + + //show me the file + BufferedReader br = new BufferedReader(new FileReader(fileFullPath)); + String line; + while ((line = br.readLine()) != null) { + logger.log(line); + } + + } catch (NullPointerException | SQLException ex) { throwAttributionDataShareException(logger, ex); } catch (InterruptedException e) { From f486e10331d0259eae9e10ab8a14181ab75fec4f Mon Sep 17 00:00:00 2001 From: smirnovaae Date: Thu, 14 Mar 2024 13:24:47 -0700 Subject: [PATCH 5/7] multi part upload --- .../AttributionDataShareHandler.java | 24 +++++++------------ 1 file changed, 9 insertions(+), 15 deletions(-) diff --git a/attribution-data-file-share/src/main/java/gov/cms/ab2d/attributionDataShare/AttributionDataShareHandler.java b/attribution-data-file-share/src/main/java/gov/cms/ab2d/attributionDataShare/AttributionDataShareHandler.java index 25edfd6..2fd2e9c 100644 --- a/attribution-data-file-share/src/main/java/gov/cms/ab2d/attributionDataShare/AttributionDataShareHandler.java +++ b/attribution-data-file-share/src/main/java/gov/cms/ab2d/attributionDataShare/AttributionDataShareHandler.java @@ -69,7 +69,7 @@ public void handleRequest(InputStream inputStream, OutputStream outputStream, Co ExecutorService executorService = Executors.newFixedThreadPool(threadCount); CountDownLatch latch = new CountDownLatch(threadCount); - try (var dbConnection = DriverManager.getConnection(parameterStore.getDbHost(), parameterStore.getDbUser(), parameterStore.getDbPassword())){ + try (var dbConnection = DriverManager.getConnection(parameterStore.getDbHost(), parameterStore.getDbUser(), parameterStore.getDbPassword())) { long startSelect = System.currentTimeMillis(); executorService.execute(new Utils(fileFullPath, select1, dbConnection, getWriter(fileFullPath), latch, logger)); @@ -82,9 +82,10 @@ public void handleRequest(InputStream inputStream, OutputStream outputStream, Co long finishSelect = System.currentTimeMillis(); logger.log("Total Select TIME ms: ---------- " + (finishSelect - startSelect)); - // helper.mtpUpload(getAsyncS3Client(ENDPOINT, parameterStore)); + // helper.mtpUpload(getAsyncS3Client(ENDPOINT, parameterStore)); + helper.mtpUpload(getAsyncS3Client(ENDPOINT, parameterStore)); //show me the file BufferedReader br = new BufferedReader(new FileReader(fileFullPath)); String line; @@ -93,34 +94,27 @@ public void handleRequest(InputStream inputStream, OutputStream outputStream, Co } - } catch (NullPointerException | SQLException ex) { + } catch (NullPointerException | URISyntaxException | SQLException | InterruptedException ex) { throwAttributionDataShareException(logger, ex); - } catch (InterruptedException e) { - throw new RuntimeException(e); } finally { FileUtil.deleteDirectoryRecursion(Paths.get(fileFullPath)); logger.log("AttributionDataShare Lambda is completed"); } } - private static synchronized BufferedWriter getWriter(String fileFullPath) - { - try{ - if( bufferedWriter == null ) - { - bufferedWriter = new BufferedWriter(new FileWriter(fileFullPath, true)); + private static synchronized BufferedWriter getWriter(String fileFullPath) { + try { + if (bufferedWriter == null) { + bufferedWriter = new BufferedWriter(new FileWriter(fileFullPath, true)); } return bufferedWriter; - } - catch(Exception e) - { + } catch (Exception e) { throw new RuntimeException(e); } } - public S3AsyncClient getAsyncS3Client(String endpoint, AttributionParameterStore parameterStore) throws URISyntaxException { var client = S3AsyncClient.crtCreate(); From 93669e6bfbfa2131c858beefc91e5c60e5cb1ea5 Mon Sep 17 00:00:00 2001 From: smirnovaae Date: Thu, 14 Mar 2024 14:49:58 -0700 Subject: [PATCH 6/7] multi part upload --- .../AttributionDataShareHandler.java | 78 ++--------------- .../AttributionDataShareHelper.java | 27 +----- .../cms/ab2d/attributionDataShare/Utils.java | 84 ------------------- 3 files changed, 6 insertions(+), 183 deletions(-) delete mode 100644 attribution-data-file-share/src/main/java/gov/cms/ab2d/attributionDataShare/Utils.java diff --git a/attribution-data-file-share/src/main/java/gov/cms/ab2d/attributionDataShare/AttributionDataShareHandler.java b/attribution-data-file-share/src/main/java/gov/cms/ab2d/attributionDataShare/AttributionDataShareHandler.java index 2fd2e9c..c3ace53 100644 --- a/attribution-data-file-share/src/main/java/gov/cms/ab2d/attributionDataShare/AttributionDataShareHandler.java +++ b/attribution-data-file-share/src/main/java/gov/cms/ab2d/attributionDataShare/AttributionDataShareHandler.java @@ -5,22 +5,19 @@ import com.amazonaws.services.lambda.runtime.RequestStreamHandler; import gov.cms.ab2d.lambdalibs.lib.FileUtil; import software.amazon.awssdk.services.s3.S3AsyncClient; -import software.amazon.awssdk.services.s3.S3Client; import software.amazon.awssdk.services.sts.StsClient; import software.amazon.awssdk.services.sts.auth.StsAssumeRoleCredentialsProvider; import software.amazon.awssdk.services.sts.model.AssumeRoleRequest; -import java.io.*; -import java.net.URI; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; import java.net.URISyntaxException; import java.nio.file.Paths; import java.sql.DriverManager; import java.sql.SQLException; import java.text.SimpleDateFormat; import java.util.Date; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import static gov.cms.ab2d.attributionDataShare.AttributionDataShareConstants.*; @@ -29,32 +26,6 @@ public class AttributionDataShareHandler implements RequestStreamHandler { // Writes out a file to the FILE_PATH. // I.E: "P.AB2D.NGD.REQ.D240209.T1122001" - String select1 = "SELECT mbi,effective_date,opt_out_flag FROM (\n" + - " SELECT *, ROW_NUMBER() OVER (ORDER BY mbi DESC) AS row_num\n" + - " FROM current_mbi\n" + - ") subquery\n" + - "WHERE row_num <= 5000"; - - String select2 = "SELECT mbi,effective_date,opt_out_flag FROM (\n" + - " SELECT *, ROW_NUMBER() OVER (ORDER BY mbi DESC) AS row_num\n" + - " FROM current_mbi\n" + - ") subquery\n" + - "WHERE row_num > 5000 And row_num <= 10000"; - - String select3 = "SELECT mbi,effective_date,opt_out_flag FROM (\n" + - " SELECT *, ROW_NUMBER() OVER (ORDER BY mbi DESC) AS row_num\n" + - " FROM current_mbi\n" + - ") subquery\n" + - "WHERE row_num > 10000 And row_num <= 20000"; - - String select4 = "SELECT mbi,effective_date,opt_out_flag FROM (\n" + - " SELECT *, ROW_NUMBER() OVER (ORDER BY mbi DESC) AS row_num\n" + - " FROM current_mbi\n" + - ") subquery\n" + - "WHERE row_num > 20000"; - - private static BufferedWriter bufferedWriter; - public void handleRequest(InputStream inputStream, OutputStream outputStream, Context context) throws IOException { LambdaLogger logger = context.getLogger(); logger.log("AttributionDataShare Lambda is started"); @@ -64,37 +35,12 @@ public void handleRequest(InputStream inputStream, OutputStream outputStream, Co String fileFullPath = FILE_PATH + fileName; var parameterStore = AttributionParameterStore.getParameterStore(); AttributionDataShareHelper helper = helperInit(fileName, fileFullPath, logger); - - int threadCount = 4; - ExecutorService executorService = Executors.newFixedThreadPool(threadCount); - CountDownLatch latch = new CountDownLatch(threadCount); - try (var dbConnection = DriverManager.getConnection(parameterStore.getDbHost(), parameterStore.getDbUser(), parameterStore.getDbPassword())) { - long startSelect = System.currentTimeMillis(); - - executorService.execute(new Utils(fileFullPath, select1, dbConnection, getWriter(fileFullPath), latch, logger)); - executorService.execute(new Utils(fileFullPath, select2, dbConnection, getWriter(fileFullPath), latch, logger)); - executorService.execute(new Utils(fileFullPath, select3, dbConnection, getWriter(fileFullPath), latch, logger)); - executorService.execute(new Utils(fileFullPath, select4, dbConnection, getWriter(fileFullPath), latch, logger)); - - latch.await(); - - long finishSelect = System.currentTimeMillis(); - - logger.log("Total Select TIME ms: ---------- " + (finishSelect - startSelect)); - // helper.mtpUpload(getAsyncS3Client(ENDPOINT, parameterStore)); - + helper.copyDataToFile(dbConnection); helper.mtpUpload(getAsyncS3Client(ENDPOINT, parameterStore)); - //show me the file - BufferedReader br = new BufferedReader(new FileReader(fileFullPath)); - String line; - while ((line = br.readLine()) != null) { - logger.log(line); - } - - } catch (NullPointerException | URISyntaxException | SQLException | InterruptedException ex) { + } catch (NullPointerException | URISyntaxException | SQLException ex) { throwAttributionDataShareException(logger, ex); } finally { FileUtil.deleteDirectoryRecursion(Paths.get(fileFullPath)); @@ -102,23 +48,9 @@ public void handleRequest(InputStream inputStream, OutputStream outputStream, Co } } - private static synchronized BufferedWriter getWriter(String fileFullPath) { - try { - if (bufferedWriter == null) { - bufferedWriter = new BufferedWriter(new FileWriter(fileFullPath, true)); - } - - return bufferedWriter; - } catch (Exception e) { - throw new RuntimeException(e); - } - } - - public S3AsyncClient getAsyncS3Client(String endpoint, AttributionParameterStore parameterStore) throws URISyntaxException { var client = S3AsyncClient.crtCreate(); - if (endpoint.equals(ENDPOINT)) { var stsClient = StsClient .builder() diff --git a/attribution-data-file-share/src/main/java/gov/cms/ab2d/attributionDataShare/AttributionDataShareHelper.java b/attribution-data-file-share/src/main/java/gov/cms/ab2d/attributionDataShare/AttributionDataShareHelper.java index f689e1d..d0570a9 100644 --- a/attribution-data-file-share/src/main/java/gov/cms/ab2d/attributionDataShare/AttributionDataShareHelper.java +++ b/attribution-data-file-share/src/main/java/gov/cms/ab2d/attributionDataShare/AttributionDataShareHelper.java @@ -1,11 +1,7 @@ package gov.cms.ab2d.attributionDataShare; import com.amazonaws.services.lambda.runtime.LambdaLogger; -import com.amazonaws.services.s3.model.AmazonS3Exception; -import software.amazon.awssdk.core.sync.RequestBody; import software.amazon.awssdk.services.s3.S3AsyncClient; -import software.amazon.awssdk.services.s3.S3Client; -import software.amazon.awssdk.services.s3.model.PutObjectRequest; import software.amazon.awssdk.transfer.s3.S3TransferManager; import software.amazon.awssdk.transfer.s3.model.CompletedFileUpload; import software.amazon.awssdk.transfer.s3.model.FileUpload; @@ -13,7 +9,6 @@ import software.amazon.awssdk.transfer.s3.progress.LoggingTransferListener; import java.io.BufferedWriter; -import java.io.File; import java.io.FileWriter; import java.io.IOException; import java.nio.file.Paths; @@ -79,27 +74,7 @@ String getResponseLine(String currentMbi, Timestamp effectiveDate, Boolean optOu return result.toString(); } - void writeFileToFinalDestination(S3Client s3Client) { - logger.log("filename " + fileName); - long startUpload = System.currentTimeMillis(); - try { - var objectRequest = PutObjectRequest.builder() - .bucket(getBucketName()) - .key(getUploadPath() + fileName) - .build(); - - s3Client.putObject(objectRequest, RequestBody.fromFile(new File(fileFullPath))); - } catch (AmazonS3Exception ex) { - var errorMessage = "Response AttributionDataShare file cannot be created. "; - logger.log(errorMessage + ex.getMessage()); - throw new AttributionDataShareException(errorMessage, ex); - } - - long finishUpload = System.currentTimeMillis(); - logger.log("Upload TIME ms: ---------- " + (finishUpload - startUpload)); - } - - String mtpUpload(S3AsyncClient s3AsyncClient) { + public String mtpUpload(S3AsyncClient s3AsyncClient) { String currentDate = new SimpleDateFormat(REQ_FILE_NAME_PATTERN).format(new Date()); var key = REQ_FILE_NAME + currentDate; logger.log("MTPU KEY " + key); diff --git a/attribution-data-file-share/src/main/java/gov/cms/ab2d/attributionDataShare/Utils.java b/attribution-data-file-share/src/main/java/gov/cms/ab2d/attributionDataShare/Utils.java deleted file mode 100644 index 96670ca..0000000 --- a/attribution-data-file-share/src/main/java/gov/cms/ab2d/attributionDataShare/Utils.java +++ /dev/null @@ -1,84 +0,0 @@ -package gov.cms.ab2d.attributionDataShare; - -import com.amazonaws.services.lambda.runtime.LambdaLogger; - -import java.io.BufferedWriter; -import java.io.FileWriter; -import java.io.IOException; -import java.sql.*; -import java.text.SimpleDateFormat; -import java.util.Date; -import java.util.concurrent.CountDownLatch; - -import static gov.cms.ab2d.attributionDataShare.AttributionDataShareConstants.*; - -public class Utils implements Runnable{ - String fileFullPath; - String select; - Connection connection; - BufferedWriter writer; - LambdaLogger logger; - private final CountDownLatch latch; - - public Utils(String fileFullPath, String select, Connection connection, BufferedWriter writer, CountDownLatch latch, LambdaLogger logger) { - this.fileFullPath = fileFullPath; - this.select = select; - this.connection = connection; - this.writer = writer; - this.logger = logger; - this.latch = latch; - } - - @Override - public void run() { - String date = new SimpleDateFormat(EFFECTIVE_DATE_PATTERN).format(new Date()); - try (var stmt = connection.createStatement()){ - long startSelect = System.currentTimeMillis(); - var rs = getExecuteQuery(select, stmt); - long finishSelect = System.currentTimeMillis(); - - logger.log("Select TIME ms: ---------- " + (finishSelect - startSelect)); - - long startWrite = System.currentTimeMillis(); - writer.write(FIRST_LINE + date); - writer.newLine(); - long records = 0; - while (rs.next()) { - var line = getResponseLine(rs.getString(1), rs.getTimestamp(2), rs.getBoolean(3)); - writer.write(line); - writer.newLine(); - records++; - } - writer.write(LAST_LINE + date + String.format("%010d", records)); - - long finishWrite = System.currentTimeMillis(); - logger.log("Write TIME ms: ---------- " + (finishWrite - startWrite)); - } catch (SQLException | IOException ex) { - String errorMessage = "An error occurred while exporting data to a file. "; - logger.log(errorMessage + ex.getMessage()); - throw new AttributionDataShareException(errorMessage, ex); - } - finally { - latch.countDown(); - } - } - - String getResponseLine(String currentMbi, Timestamp effectiveDate, Boolean optOutFlag) { - var result = new StringBuilder(); - result.append(currentMbi); - // Adding spaces to the end of a string to achieve the required position index - if (currentMbi.length() < CURRENT_MBI_LENGTH) - result.append(" ".repeat(Math.max(0, CURRENT_MBI_LENGTH - currentMbi.length()))); - - if (effectiveDate != null) { - result.append(new SimpleDateFormat(EFFECTIVE_DATE_PATTERN).format(effectiveDate)); - result.append((optOutFlag) ? 'Y' : 'N'); - } - return result.toString(); - } - - static ResultSet getExecuteQuery(String select, Statement statement) throws SQLException { - return statement.executeQuery(select); - } - -} From ce125aa52dd5ea9b1a8604795256af17d863aa67 Mon Sep 17 00:00:00 2001 From: smirnovaae Date: Fri, 15 Mar 2024 10:27:48 -0700 Subject: [PATCH 7/7] multi part upload --- attribution-data-file-share/build.gradle | 1 - .../AttributionDataShareHandler.java | 2 +- .../AttributionDataShareHelper.java | 15 ++------------- .../AttributionDataShareHandlerTest.java | 12 ++++++------ .../AttributionDataShareTest.java | 2 +- .../attributionDataShare/S3MockAPIExtension.java | 6 +++--- 6 files changed, 13 insertions(+), 25 deletions(-) diff --git a/attribution-data-file-share/build.gradle b/attribution-data-file-share/build.gradle index dea8149..3f600c5 100644 --- a/attribution-data-file-share/build.gradle +++ b/attribution-data-file-share/build.gradle @@ -16,7 +16,6 @@ dependencies { implementation 'software.amazon.awssdk:s3-transfer-manager:2.25.7' implementation 'software.amazon.awssdk.crt:aws-crt:0.29.11' implementation 'org.postgresql:postgresql:42.7.2' - implementation 'software.amazon.awssdk:s3:2.21.7' implementation 'software.amazon.awssdk:ssm:2.25.7' implementation 'software.amazon.awssdk:sts:2.25.6' implementation project(path: ':lambda-lib') diff --git a/attribution-data-file-share/src/main/java/gov/cms/ab2d/attributionDataShare/AttributionDataShareHandler.java b/attribution-data-file-share/src/main/java/gov/cms/ab2d/attributionDataShare/AttributionDataShareHandler.java index c3ace53..311169e 100644 --- a/attribution-data-file-share/src/main/java/gov/cms/ab2d/attributionDataShare/AttributionDataShareHandler.java +++ b/attribution-data-file-share/src/main/java/gov/cms/ab2d/attributionDataShare/AttributionDataShareHandler.java @@ -38,7 +38,7 @@ public void handleRequest(InputStream inputStream, OutputStream outputStream, Co try (var dbConnection = DriverManager.getConnection(parameterStore.getDbHost(), parameterStore.getDbUser(), parameterStore.getDbPassword())) { helper.copyDataToFile(dbConnection); - helper.mtpUpload(getAsyncS3Client(ENDPOINT, parameterStore)); + helper.uploadToS3(getAsyncS3Client(ENDPOINT, parameterStore)); } catch (NullPointerException | URISyntaxException | SQLException ex) { throwAttributionDataShareException(logger, ex); diff --git a/attribution-data-file-share/src/main/java/gov/cms/ab2d/attributionDataShare/AttributionDataShareHelper.java b/attribution-data-file-share/src/main/java/gov/cms/ab2d/attributionDataShare/AttributionDataShareHelper.java index d0570a9..bb95130 100644 --- a/attribution-data-file-share/src/main/java/gov/cms/ab2d/attributionDataShare/AttributionDataShareHelper.java +++ b/attribution-data-file-share/src/main/java/gov/cms/ab2d/attributionDataShare/AttributionDataShareHelper.java @@ -33,13 +33,9 @@ void copyDataToFile(Connection connection) { String date = new SimpleDateFormat(EFFECTIVE_DATE_PATTERN).format(new Date()); try (var stmt = connection.createStatement(); var writer = new BufferedWriter(new FileWriter(fileFullPath, true))) { - long startSelect = System.currentTimeMillis(); - var rs = getExecuteQuery(stmt); - long finishSelect = System.currentTimeMillis(); - logger.log("Select TIME ms: ---------- " + (finishSelect - startSelect)); + var rs = getExecuteQuery(stmt); - long startWrite = System.currentTimeMillis(); writer.write(FIRST_LINE + date); writer.newLine(); long records = 0; @@ -51,8 +47,6 @@ void copyDataToFile(Connection connection) { } writer.write(LAST_LINE + date + String.format("%010d", records)); - long finishWrite = System.currentTimeMillis(); - logger.log("Write TIME ms: ---------- " + (finishWrite - startWrite)); } catch (SQLException | IOException ex) { String errorMessage = "An error occurred while exporting data to a file. "; logger.log(errorMessage + ex.getMessage()); @@ -74,11 +68,9 @@ String getResponseLine(String currentMbi, Timestamp effectiveDate, Boolean optOu return result.toString(); } - public String mtpUpload(S3AsyncClient s3AsyncClient) { + public String uploadToS3(S3AsyncClient s3AsyncClient) { String currentDate = new SimpleDateFormat(REQ_FILE_NAME_PATTERN).format(new Date()); var key = REQ_FILE_NAME + currentDate; - logger.log("MTPU KEY " + key); - long startUpload = System.currentTimeMillis(); S3TransferManager transferManager = S3TransferManager.builder() .s3Client(s3AsyncClient) .build(); @@ -92,9 +84,6 @@ public String mtpUpload(S3AsyncClient s3AsyncClient) { FileUpload fileUpload = transferManager.uploadFile(uploadFileRequest); CompletedFileUpload uploadResult = fileUpload.completionFuture().join(); - - long finishUpload = System.currentTimeMillis(); - logger.log("Multipart Upload TIME ms: ---------- " + (finishUpload - startUpload)); return uploadResult.response().eTag(); } diff --git a/attribution-data-file-share/src/test/java/gov/cms/ab2d/attributionDataShare/AttributionDataShareHandlerTest.java b/attribution-data-file-share/src/test/java/gov/cms/ab2d/attributionDataShare/AttributionDataShareHandlerTest.java index 6869c65..aa79543 100644 --- a/attribution-data-file-share/src/test/java/gov/cms/ab2d/attributionDataShare/AttributionDataShareHandlerTest.java +++ b/attribution-data-file-share/src/test/java/gov/cms/ab2d/attributionDataShare/AttributionDataShareHandlerTest.java @@ -29,14 +29,14 @@ class AttributionDataShareHandlerTest { @Test void attributionDataShareInvoke() { - var mockParameterStore = mockStatic(AttributionParameterStore.class); + var mockParameterStore = mockStatic(AttributionParameterStore.class); mockParameterStore .when(AttributionParameterStore::getParameterStore) .thenReturn(parameterStore); Connection dbConnection = mock(Connection.class); mockStatic(DriverManager.class) - .when(() -> DriverManager.getConnection(anyString(), anyString(), anyString())).thenReturn(dbConnection); + .when(() -> DriverManager.getConnection(anyString(), anyString(), anyString())).thenReturn(dbConnection); when(handler.helperInit(anyString(), anyString(), any(LambdaLogger.class))).thenReturn(helper); assertDoesNotThrow(() -> handler.handleRequest(null, System.out, new TestContext())); @@ -49,8 +49,8 @@ void attributionDataShareExceptionTest() { assertThrows(AttributionDataShareException.class, () -> handler.throwAttributionDataShareException(LOGGER, ex)); } -// @Test -// void getS3ClientTest() throws URISyntaxException { -// assertNotNull(handler.getS3Client(TEST_ENDPOINT, parameterStore)); -// } + @Test + void getS3ClientTest() throws URISyntaxException { + assertNotNull(handler.getAsyncS3Client(TEST_ENDPOINT, parameterStore)); + } } diff --git a/attribution-data-file-share/src/test/java/gov/cms/ab2d/attributionDataShare/AttributionDataShareTest.java b/attribution-data-file-share/src/test/java/gov/cms/ab2d/attributionDataShare/AttributionDataShareTest.java index 6193804..435d814 100644 --- a/attribution-data-file-share/src/test/java/gov/cms/ab2d/attributionDataShare/AttributionDataShareTest.java +++ b/attribution-data-file-share/src/test/java/gov/cms/ab2d/attributionDataShare/AttributionDataShareTest.java @@ -78,7 +78,7 @@ void getResponseLineTest() { @Test void writeFileToFinalDestinationTest() throws IOException { createTestFile(); - helper.writeFileToFinalDestination(S3MockAPIExtension.S3_CLIENT); + helper.uploadToS3(S3MockAPIExtension.S3_CLIENT); assertTrue(S3MockAPIExtension.isObjectExists(FILE_NAME)); S3MockAPIExtension.deleteFile(FILE_NAME); } diff --git a/attribution-data-file-share/src/test/java/gov/cms/ab2d/attributionDataShare/S3MockAPIExtension.java b/attribution-data-file-share/src/test/java/gov/cms/ab2d/attributionDataShare/S3MockAPIExtension.java index 962491a..2ca03be 100644 --- a/attribution-data-file-share/src/test/java/gov/cms/ab2d/attributionDataShare/S3MockAPIExtension.java +++ b/attribution-data-file-share/src/test/java/gov/cms/ab2d/attributionDataShare/S3MockAPIExtension.java @@ -3,7 +3,7 @@ import io.findify.s3mock.S3Mock; import org.junit.jupiter.api.extension.BeforeAllCallback; import org.junit.jupiter.api.extension.ExtensionContext; -import software.amazon.awssdk.services.s3.S3Client; +import software.amazon.awssdk.services.s3.S3AsyncClient; import software.amazon.awssdk.services.s3.model.*; import java.net.URI; @@ -14,7 +14,7 @@ public class S3MockAPIExtension implements BeforeAllCallback, ExtensionContext.Store.CloseableResource { private static final S3Mock API = S3Mock.create(8001, "/tmp/s3"); - public static S3Client S3_CLIENT; + public static S3AsyncClient S3_CLIENT; private static boolean STARTED = false; @Override @@ -25,7 +25,7 @@ public void beforeAll(ExtensionContext context) throws Exception { API.start(); - S3_CLIENT = S3Client.builder() + S3_CLIENT = S3AsyncClient.crtBuilder() .region(S3_REGION) .endpointOverride(new URI(TEST_ENDPOINT)) .build();