From 37949ee76ea12961c632dbefe49e376645986c23 Mon Sep 17 00:00:00 2001 From: Anna Smirnova <132938234+smirnovaae@users.noreply.github.com> Date: Fri, 15 Mar 2024 12:27:01 -0700 Subject: [PATCH] Ab2d-6030/Multipart upload to S3 (#94) --- attribution-data-file-share/build.gradle | 3 +- .../AttributionDataShareHandler.java | 24 ++++++----- .../AttributionDataShareHelper.java | 43 +++++++++++-------- .../AttributionDataShareHandlerTest.java | 6 +-- .../AttributionDataShareTest.java | 2 +- .../S3MockAPIExtension.java | 6 +-- 6 files changed, 49 insertions(+), 35 deletions(-) diff --git a/attribution-data-file-share/build.gradle b/attribution-data-file-share/build.gradle index 37312ec..3f600c5 100644 --- a/attribution-data-file-share/build.gradle +++ b/attribution-data-file-share/build.gradle @@ -13,8 +13,9 @@ repositories { dependencies { implementation 'com.amazonaws:aws-lambda-java-core:1.2.2' implementation 'com.amazonaws:aws-java-sdk-s3:1.12.529' + implementation 'software.amazon.awssdk:s3-transfer-manager:2.25.7' + implementation 'software.amazon.awssdk.crt:aws-crt:0.29.11' implementation 'org.postgresql:postgresql:42.7.2' - implementation 'software.amazon.awssdk:s3:2.21.7' implementation 'software.amazon.awssdk:ssm:2.25.7' implementation 'software.amazon.awssdk:sts:2.25.6' implementation project(path: ':lambda-lib') diff --git a/attribution-data-file-share/src/main/java/gov/cms/ab2d/attributionDataShare/AttributionDataShareHandler.java b/attribution-data-file-share/src/main/java/gov/cms/ab2d/attributionDataShare/AttributionDataShareHandler.java index 81a21c1..311169e 100644 --- a/attribution-data-file-share/src/main/java/gov/cms/ab2d/attributionDataShare/AttributionDataShareHandler.java +++ b/attribution-data-file-share/src/main/java/gov/cms/ab2d/attributionDataShare/AttributionDataShareHandler.java @@ -4,7 +4,7 @@ import com.amazonaws.services.lambda.runtime.LambdaLogger; import com.amazonaws.services.lambda.runtime.RequestStreamHandler; import gov.cms.ab2d.lambdalibs.lib.FileUtil; -import software.amazon.awssdk.services.s3.S3Client; +import software.amazon.awssdk.services.s3.S3AsyncClient; import software.amazon.awssdk.services.sts.StsClient; import software.amazon.awssdk.services.sts.auth.StsAssumeRoleCredentialsProvider; import software.amazon.awssdk.services.sts.model.AssumeRoleRequest; @@ -12,7 +12,6 @@ import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; -import java.net.URI; import java.net.URISyntaxException; import java.nio.file.Paths; import java.sql.DriverManager; @@ -36,10 +35,10 @@ public void handleRequest(InputStream inputStream, OutputStream outputStream, Co String fileFullPath = FILE_PATH + fileName; var parameterStore = AttributionParameterStore.getParameterStore(); AttributionDataShareHelper helper = helperInit(fileName, fileFullPath, logger); - try (var dbConnection = DriverManager.getConnection(parameterStore.getDbHost(), parameterStore.getDbUser(), parameterStore.getDbPassword())){ + try (var dbConnection = DriverManager.getConnection(parameterStore.getDbHost(), parameterStore.getDbUser(), parameterStore.getDbPassword())) { helper.copyDataToFile(dbConnection); - helper.writeFileToFinalDestination(getS3Client(ENDPOINT, parameterStore)); + helper.uploadToS3(getAsyncS3Client(ENDPOINT, parameterStore)); } catch (NullPointerException | URISyntaxException | SQLException ex) { throwAttributionDataShareException(logger, ex); @@ -49,10 +48,8 @@ public void handleRequest(InputStream inputStream, OutputStream outputStream, Co } } - public S3Client getS3Client(String endpoint, AttributionParameterStore parameterStore) throws URISyntaxException { - var client = S3Client.builder() - .region(S3_REGION) - .endpointOverride(new URI(endpoint)); + public S3AsyncClient getAsyncS3Client(String endpoint, AttributionParameterStore parameterStore) throws URISyntaxException { + var client = S3AsyncClient.crtCreate(); if (endpoint.equals(ENDPOINT)) { var stsClient = StsClient @@ -72,10 +69,17 @@ public S3Client getS3Client(String endpoint, AttributionParameterStore parameter .refreshRequest(request) .build(); - client.credentialsProvider(credentials); + client = + S3AsyncClient.crtBuilder() + .credentialsProvider(credentials) + .region(S3_REGION) + .targetThroughputInGbps(20.0) + .minimumPartSizeInBytes(8 * 1025 * 1024L) + .build(); } - return client.build(); + return client; } + AttributionDataShareHelper helperInit(String fileName, String fileFullPath, LambdaLogger logger) { return new AttributionDataShareHelper(fileName, fileFullPath, logger); } diff --git a/attribution-data-file-share/src/main/java/gov/cms/ab2d/attributionDataShare/AttributionDataShareHelper.java b/attribution-data-file-share/src/main/java/gov/cms/ab2d/attributionDataShare/AttributionDataShareHelper.java index b05f25e..bb95130 100644 --- a/attribution-data-file-share/src/main/java/gov/cms/ab2d/attributionDataShare/AttributionDataShareHelper.java +++ b/attribution-data-file-share/src/main/java/gov/cms/ab2d/attributionDataShare/AttributionDataShareHelper.java @@ -1,15 +1,17 @@ package gov.cms.ab2d.attributionDataShare; import com.amazonaws.services.lambda.runtime.LambdaLogger; -import com.amazonaws.services.s3.model.AmazonS3Exception; -import software.amazon.awssdk.core.sync.RequestBody; -import software.amazon.awssdk.services.s3.S3Client; -import software.amazon.awssdk.services.s3.model.PutObjectRequest; +import software.amazon.awssdk.services.s3.S3AsyncClient; +import software.amazon.awssdk.transfer.s3.S3TransferManager; +import software.amazon.awssdk.transfer.s3.model.CompletedFileUpload; +import software.amazon.awssdk.transfer.s3.model.FileUpload; +import software.amazon.awssdk.transfer.s3.model.UploadFileRequest; +import software.amazon.awssdk.transfer.s3.progress.LoggingTransferListener; import java.io.BufferedWriter; -import java.io.File; import java.io.FileWriter; import java.io.IOException; +import java.nio.file.Paths; import java.sql.*; import java.text.SimpleDateFormat; import java.util.Date; @@ -31,7 +33,9 @@ void copyDataToFile(Connection connection) { String date = new SimpleDateFormat(EFFECTIVE_DATE_PATTERN).format(new Date()); try (var stmt = connection.createStatement(); var writer = new BufferedWriter(new FileWriter(fileFullPath, true))) { + var rs = getExecuteQuery(stmt); + writer.write(FIRST_LINE + date); writer.newLine(); long records = 0; @@ -42,6 +46,7 @@ void copyDataToFile(Connection connection) { records++; } writer.write(LAST_LINE + date + String.format("%010d", records)); + } catch (SQLException | IOException ex) { String errorMessage = "An error occurred while exporting data to a file. "; logger.log(errorMessage + ex.getMessage()); @@ -63,19 +68,23 @@ String getResponseLine(String currentMbi, Timestamp effectiveDate, Boolean optOu return result.toString(); } - void writeFileToFinalDestination(S3Client s3Client) { - try { - var objectRequest = PutObjectRequest.builder() - .bucket(getBucketName()) - .key(getUploadPath() + fileName) - .build(); + public String uploadToS3(S3AsyncClient s3AsyncClient) { + String currentDate = new SimpleDateFormat(REQ_FILE_NAME_PATTERN).format(new Date()); + var key = REQ_FILE_NAME + currentDate; + S3TransferManager transferManager = S3TransferManager.builder() + .s3Client(s3AsyncClient) + .build(); - s3Client.putObject(objectRequest, RequestBody.fromFile(new File(fileFullPath))); - } catch (AmazonS3Exception ex) { - var errorMessage = "Response AttributionDataShare file cannot be created. "; - logger.log(errorMessage + ex.getMessage()); - throw new AttributionDataShareException(errorMessage, ex); - } + UploadFileRequest uploadFileRequest = UploadFileRequest.builder() + .putObjectRequest(b -> b.bucket(getBucketName()).key(getUploadPath() + key)) + .addTransferListener(LoggingTransferListener.create()) + .source(Paths.get(fileFullPath)) + .build(); + + FileUpload fileUpload = transferManager.uploadFile(uploadFileRequest); + + CompletedFileUpload uploadResult = fileUpload.completionFuture().join(); + return uploadResult.response().eTag(); } public String getBucketName() { diff --git a/attribution-data-file-share/src/test/java/gov/cms/ab2d/attributionDataShare/AttributionDataShareHandlerTest.java b/attribution-data-file-share/src/test/java/gov/cms/ab2d/attributionDataShare/AttributionDataShareHandlerTest.java index 56231de..aa79543 100644 --- a/attribution-data-file-share/src/test/java/gov/cms/ab2d/attributionDataShare/AttributionDataShareHandlerTest.java +++ b/attribution-data-file-share/src/test/java/gov/cms/ab2d/attributionDataShare/AttributionDataShareHandlerTest.java @@ -29,14 +29,14 @@ class AttributionDataShareHandlerTest { @Test void attributionDataShareInvoke() { - var mockParameterStore = mockStatic(AttributionParameterStore.class); + var mockParameterStore = mockStatic(AttributionParameterStore.class); mockParameterStore .when(AttributionParameterStore::getParameterStore) .thenReturn(parameterStore); Connection dbConnection = mock(Connection.class); mockStatic(DriverManager.class) - .when(() -> DriverManager.getConnection(anyString(), anyString(), anyString())).thenReturn(dbConnection); + .when(() -> DriverManager.getConnection(anyString(), anyString(), anyString())).thenReturn(dbConnection); when(handler.helperInit(anyString(), anyString(), any(LambdaLogger.class))).thenReturn(helper); assertDoesNotThrow(() -> handler.handleRequest(null, System.out, new TestContext())); @@ -51,6 +51,6 @@ void attributionDataShareExceptionTest() { @Test void getS3ClientTest() throws URISyntaxException { - assertNotNull(handler.getS3Client(TEST_ENDPOINT, parameterStore)); + assertNotNull(handler.getAsyncS3Client(TEST_ENDPOINT, parameterStore)); } } diff --git a/attribution-data-file-share/src/test/java/gov/cms/ab2d/attributionDataShare/AttributionDataShareTest.java b/attribution-data-file-share/src/test/java/gov/cms/ab2d/attributionDataShare/AttributionDataShareTest.java index 6193804..435d814 100644 --- a/attribution-data-file-share/src/test/java/gov/cms/ab2d/attributionDataShare/AttributionDataShareTest.java +++ b/attribution-data-file-share/src/test/java/gov/cms/ab2d/attributionDataShare/AttributionDataShareTest.java @@ -78,7 +78,7 @@ void getResponseLineTest() { @Test void writeFileToFinalDestinationTest() throws IOException { createTestFile(); - helper.writeFileToFinalDestination(S3MockAPIExtension.S3_CLIENT); + helper.uploadToS3(S3MockAPIExtension.S3_CLIENT); assertTrue(S3MockAPIExtension.isObjectExists(FILE_NAME)); S3MockAPIExtension.deleteFile(FILE_NAME); } diff --git a/attribution-data-file-share/src/test/java/gov/cms/ab2d/attributionDataShare/S3MockAPIExtension.java b/attribution-data-file-share/src/test/java/gov/cms/ab2d/attributionDataShare/S3MockAPIExtension.java index 962491a..2ca03be 100644 --- a/attribution-data-file-share/src/test/java/gov/cms/ab2d/attributionDataShare/S3MockAPIExtension.java +++ b/attribution-data-file-share/src/test/java/gov/cms/ab2d/attributionDataShare/S3MockAPIExtension.java @@ -3,7 +3,7 @@ import io.findify.s3mock.S3Mock; import org.junit.jupiter.api.extension.BeforeAllCallback; import org.junit.jupiter.api.extension.ExtensionContext; -import software.amazon.awssdk.services.s3.S3Client; +import software.amazon.awssdk.services.s3.S3AsyncClient; import software.amazon.awssdk.services.s3.model.*; import java.net.URI; @@ -14,7 +14,7 @@ public class S3MockAPIExtension implements BeforeAllCallback, ExtensionContext.Store.CloseableResource { private static final S3Mock API = S3Mock.create(8001, "/tmp/s3"); - public static S3Client S3_CLIENT; + public static S3AsyncClient S3_CLIENT; private static boolean STARTED = false; @Override @@ -25,7 +25,7 @@ public void beforeAll(ExtensionContext context) throws Exception { API.start(); - S3_CLIENT = S3Client.builder() + S3_CLIENT = S3AsyncClient.crtBuilder() .region(S3_REGION) .endpointOverride(new URI(TEST_ENDPOINT)) .build();