Skip to content

Commit

Permalink
Fix file uploading
Browse files Browse the repository at this point in the history
  • Loading branch information
kaklakariada committed Oct 9, 2023
1 parent c94cda2 commit 85f29db
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 12 deletions.
10 changes: 6 additions & 4 deletions src/test/scala/com/exasol/cloudetl/it/BaseDataImporter.scala
Original file line number Diff line number Diff line change
Expand Up @@ -14,19 +14,22 @@ trait BaseDataImporter extends BaseS3IntegrationTest with BeforeAndAfterEach wit
val dataFormat: String

var outputDirectory: Path = _
val paths: List[HPath] = List()
var paths: List[HPath] = List()
val baseFileName = "part-"

override final def beforeEach(): Unit = {
outputDirectory = createTemporaryFolder(s"$dataFormat-tests-")
()
}

override final def afterEach(): Unit =
override final def afterEach(): Unit = {
paths = List()
deletePathFiles(outputDirectory)
}

override def beforeAll(): Unit = {
super.beforeAll()
createBucket(bucketName)
prepareExasolDatabase(schemaName)
createS3ConnectionObject()
}
Expand All @@ -39,7 +42,7 @@ trait BaseDataImporter extends BaseS3IntegrationTest with BeforeAndAfterEach wit
def addFile(): HPath = {
val fileCounter = String.format("%04d", paths.length)
val newPath = new HPath(outputDirectory.toUri.toString, s"$baseFileName$fileCounter.$dataFormat")
paths.appended(newPath)
paths = paths.appended(newPath)
return newPath
}

Expand Down Expand Up @@ -68,5 +71,4 @@ trait BaseDataImporter extends BaseS3IntegrationTest with BeforeAndAfterEach wit
()
}
}

}
17 changes: 13 additions & 4 deletions src/test/scala/com/exasol/cloudetl/it/BaseS3IntegrationTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ trait BaseS3IntegrationTest extends BaseIntegrationTest {
def getAWSSecretKey(): String = s3Container.getSecretKey()

def uploadFileToS3(bucket: String, file: HPath): Unit = {
createBucket(bucket)
logger.info(s"Uploading file $file to bucket $bucket")
val request = new PutObjectRequest(bucket, file.getName(), new File(file.toUri()))
s3.putObject(request)
()
Expand All @@ -94,19 +94,28 @@ trait BaseS3IntegrationTest extends BaseIntegrationTest {
()
}

def importFromS3IntoExasol(schemaName: String, table: Table, bucket: String, file: String, dataFormat: String): Unit =
def importFromS3IntoExasol(
schemaName: String,
table: Table,
bucket: String,
file: String,
dataFormat: String
): Unit = {
val bucketPath = s"s3a://$bucket/$file"
logger.info(s"Importing $bucketPath of format $dataFormat into table ${table.getFullyQualifiedName()}...")
executeStmt(
s"""|IMPORT INTO ${table.getFullyQualifiedName()}
|FROM SCRIPT $schemaName.IMPORT_PATH WITH
|BUCKET_PATH = 's3a://$bucket/$file'
|BUCKET_PATH = '$bucketPath'
|DATA_FORMAT = '$dataFormat'
|S3_ENDPOINT = '$s3Endpoint'
|S3_CHANGE_DETECTION_MODE = 'none'
|TRUNCATE_STRING = 'true'
|CONNECTION_NAME = 'S3_CONNECTION'
|PARALLELISM = 'nproc()';
""".stripMargin
""".stripMargin
)
}

def exportIntoS3(schemaName: String, tableName: String, bucket: String): Unit =
executeStmt(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -476,7 +476,7 @@ class ParquetDataImporterIT extends BaseDataImporter {
)
}

test("imports multiple columns") {
test("imports multiple columns from single file") {
MultiParquetChecker(
"required binary name (UTF8); required int32 age;",
Map("NAME" -> "VARCHAR(60)", "AGE" -> "INTEGER"),
Expand All @@ -487,9 +487,9 @@ class ParquetDataImporterIT extends BaseDataImporter {
writer.write(new SimpleGroup(schema).append("name", "Jane").append("age", 22))
}
.assertResultSet(
table("VARCHAR", "DECIMAL")
.row("John", 24)
.row("Jane", 22)
table("VARCHAR", "BIGINT")
.row("John", 24L)
.row("Jane", 22L)
.matches()
)
}
Expand Down

0 comments on commit 85f29db

Please sign in to comment.