From 85f29db703d65e45770644c715e8d7b2dcd3ec88 Mon Sep 17 00:00:00 2001 From: Christoph Pirkl Date: Mon, 9 Oct 2023 08:46:32 +0000 Subject: [PATCH] Fix file uploading --- .../exasol/cloudetl/it/BaseDataImporter.scala | 10 ++++++---- .../cloudetl/it/BaseS3IntegrationTest.scala | 17 +++++++++++++---- .../it/parquet/ParquetDataImporterIT.scala | 8 ++++---- 3 files changed, 23 insertions(+), 12 deletions(-) diff --git a/src/test/scala/com/exasol/cloudetl/it/BaseDataImporter.scala b/src/test/scala/com/exasol/cloudetl/it/BaseDataImporter.scala index e9ee786e..d447ea21 100644 --- a/src/test/scala/com/exasol/cloudetl/it/BaseDataImporter.scala +++ b/src/test/scala/com/exasol/cloudetl/it/BaseDataImporter.scala @@ -14,7 +14,7 @@ trait BaseDataImporter extends BaseS3IntegrationTest with BeforeAndAfterEach wit val dataFormat: String var outputDirectory: Path = _ - val paths: List[HPath] = List() + var paths: List[HPath] = List() val baseFileName = "part-" override final def beforeEach(): Unit = { @@ -22,11 +22,14 @@ trait BaseDataImporter extends BaseS3IntegrationTest with BeforeAndAfterEach wit () } - override final def afterEach(): Unit = + override final def afterEach(): Unit = { + paths = List() deletePathFiles(outputDirectory) + } override def beforeAll(): Unit = { super.beforeAll() + createBucket(bucketName) prepareExasolDatabase(schemaName) createS3ConnectionObject() } @@ -39,7 +42,7 @@ trait BaseDataImporter extends BaseS3IntegrationTest with BeforeAndAfterEach wit def addFile(): HPath = { val fileCounter = String.format("%04d", paths.length) val newPath = new HPath(outputDirectory.toUri.toString, s"$baseFileName$fileCounter.$dataFormat") - paths.appended(newPath) + paths = paths.appended(newPath) return newPath } @@ -68,5 +71,4 @@ trait BaseDataImporter extends BaseS3IntegrationTest with BeforeAndAfterEach wit () } } - } diff --git a/src/test/scala/com/exasol/cloudetl/it/BaseS3IntegrationTest.scala b/src/test/scala/com/exasol/cloudetl/it/BaseS3IntegrationTest.scala index c1dd6c9a..f7f2f377 100644 --- a/src/test/scala/com/exasol/cloudetl/it/BaseS3IntegrationTest.scala +++ b/src/test/scala/com/exasol/cloudetl/it/BaseS3IntegrationTest.scala @@ -83,7 +83,7 @@ trait BaseS3IntegrationTest extends BaseIntegrationTest { def getAWSSecretKey(): String = s3Container.getSecretKey() def uploadFileToS3(bucket: String, file: HPath): Unit = { - createBucket(bucket) + logger.info(s"Uploading file $file to bucket $bucket") val request = new PutObjectRequest(bucket, file.getName(), new File(file.toUri())) s3.putObject(request) () @@ -94,19 +94,28 @@ trait BaseS3IntegrationTest extends BaseIntegrationTest { () } - def importFromS3IntoExasol(schemaName: String, table: Table, bucket: String, file: String, dataFormat: String): Unit = + def importFromS3IntoExasol( + schemaName: String, + table: Table, + bucket: String, + file: String, + dataFormat: String + ): Unit = { + val bucketPath = s"s3a://$bucket/$file" + logger.info(s"Importing $bucketPath of format $dataFormat into table ${table.getFullyQualifiedName()}...") executeStmt( s"""|IMPORT INTO ${table.getFullyQualifiedName()} |FROM SCRIPT $schemaName.IMPORT_PATH WITH - |BUCKET_PATH = 's3a://$bucket/$file' + |BUCKET_PATH = '$bucketPath' |DATA_FORMAT = '$dataFormat' |S3_ENDPOINT = '$s3Endpoint' |S3_CHANGE_DETECTION_MODE = 'none' |TRUNCATE_STRING = 'true' |CONNECTION_NAME = 'S3_CONNECTION' |PARALLELISM = 'nproc()'; - """.stripMargin + """.stripMargin ) + } def exportIntoS3(schemaName: String, tableName: String, bucket: String): Unit = executeStmt( diff --git a/src/test/scala/com/exasol/cloudetl/it/parquet/ParquetDataImporterIT.scala b/src/test/scala/com/exasol/cloudetl/it/parquet/ParquetDataImporterIT.scala index 27813ef7..5aa892b3 100644 --- a/src/test/scala/com/exasol/cloudetl/it/parquet/ParquetDataImporterIT.scala +++ b/src/test/scala/com/exasol/cloudetl/it/parquet/ParquetDataImporterIT.scala @@ -476,7 +476,7 @@ class ParquetDataImporterIT extends BaseDataImporter { ) } - test("imports multiple columns") { + test("imports multiple columns from single file") { MultiParquetChecker( "required binary name (UTF8); required int32 age;", Map("NAME" -> "VARCHAR(60)", "AGE" -> "INTEGER"), @@ -487,9 +487,9 @@ class ParquetDataImporterIT extends BaseDataImporter { writer.write(new SimpleGroup(schema).append("name", "Jane").append("age", 22)) } .assertResultSet( - table("VARCHAR", "DECIMAL") - .row("John", 24) - .row("Jane", 22) + table("VARCHAR", "BIGINT") + .row("John", 24L) + .row("Jane", 22L) .matches() ) }