Skip to content

Commit

Permalink
Allow importing multiple files
Browse files Browse the repository at this point in the history
  • Loading branch information
kaklakariada committed Oct 9, 2023
1 parent f815782 commit 91755ff
Show file tree
Hide file tree
Showing 5 changed files with 19 additions and 8 deletions.
15 changes: 11 additions & 4 deletions src/test/scala/com/exasol/cloudetl/it/BaseDataImporter.scala
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,11 @@ trait BaseDataImporter extends BaseS3IntegrationTest with BeforeAndAfterEach wit
val dataFormat: String

var outputDirectory: Path = _
var path: HPath = _
val paths: List[HPath] = List()
val baseFileName = "part-"

override final def beforeEach(): Unit = {
outputDirectory = createTemporaryFolder(s"$dataFormat-tests-")
path = new HPath(outputDirectory.toUri.toString, s"part-00000.$dataFormat")
()
}

Expand All @@ -36,20 +36,27 @@ trait BaseDataImporter extends BaseS3IntegrationTest with BeforeAndAfterEach wit
super.afterAll()
}

def addFile(): HPath = {
val fileCounter = String.format("%04d", paths.length)
val newPath = new HPath(outputDirectory.toUri.toString, s"$baseFileName$fileCounter.$dataFormat")
paths.appended(newPath)
return newPath
}

abstract class AbstractChecker(exaColumnType: String, tableName: String)
extends AbstractMultiColChecker(Map("COLUMN" -> exaColumnType), tableName)

abstract class AbstractMultiColChecker(columns: Map[String, String], tableName: String) {
def withResultSet(block: ResultSet => Unit): this.type = {
uploadFileToS3(bucketName, path)
paths.foreach(path => uploadFileToS3(bucketName, path))
val tableBuilder = schema
.createTableBuilder(tableName.toUpperCase(java.util.Locale.ENGLISH))
columns.foreach { case (colName, colType) =>
tableBuilder.column(colName, colType)
}

val table = tableBuilder.build()
importFromS3IntoExasol(schemaName, table, bucketName, path.getName(), dataFormat)
importFromS3IntoExasol(schemaName, table, bucketName, baseFileName, dataFormat)
val rs = executeQuery(s"SELECT * FROM ${table.getFullyQualifiedName()}")
block(rs)
rs.close()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import com.exasol.dbbuilder.dialects.exasol.ExasolSchema
import com.exasol.dbbuilder.dialects.exasol.udf.UdfScript

import com.typesafe.scalalogging.LazyLogging
import org.apache.derby.client.am.SqlException
import org.scalatest.BeforeAndAfterAll
import org.scalatest.funsuite.AnyFunSuite

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -331,6 +331,7 @@ class AvroDataImporterIT extends BaseDataImporter {
val avroSchema = new Schema.Parser().parse(avroSchemaStr)

def withInputValues[T](values: List[T]): AvroChecker = {
val path = addFile()
writeDataValues(values, path, avroSchema)
this
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,7 @@ class OrcDataImporterIT extends BaseDataImporter {
test("imports union") {
val orcType = "struct<f:uniontype<int,string>>"
val orcSchema = TypeDescription.fromString(orcType)
val path = addFile()
val writer = OrcFile.createWriter(path, OrcFile.writerOptions(conf).setSchema(orcSchema))
val batch = orcSchema.createRowBatch()
batch.size = 3
Expand Down Expand Up @@ -262,6 +263,7 @@ class OrcDataImporterIT extends BaseDataImporter {
val orcSchema = TypeDescription.fromString(orcColumn)

def withInputValues[T](values: List[T]): OrcChecker = {
val path = addFile()
writeDataValues(values, path, orcSchema)
this
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -482,12 +482,12 @@ class ParquetDataImporterIT extends BaseDataImporter {
Map("NAME" -> "VARCHAR(60)", "AGE" -> "INTEGER"),
"multi_col"
)
.withWriter { case (writer, schema) =>
.addParquetFile { case (writer, schema) =>
writer.write(new SimpleGroup(schema).append("name", "John").append("age", 24))
writer.write(new SimpleGroup(schema).append("name", "Jane").append("age", 22))
}
.assertResultSet(
table()
table("VARCHAR", "DECIMAL")
.row("John", 24)
.row("Jane", 22)
.matches()
Expand All @@ -507,6 +507,7 @@ class ParquetDataImporterIT extends BaseDataImporter {
}

def withInputValues[T](values: List[T]): ParquetChecker = {
val path = addFile()
writeDataValues(values, path, parquetSchema)
this
}
Expand All @@ -517,7 +518,8 @@ class ParquetDataImporterIT extends BaseDataImporter {
with ParquetTestDataWriter {
private val parquetSchema = MessageTypeParser.parseMessageType(s"message test { $parquetColumn }")

def withWriter(block: (ParquetWriter[Group], MessageType) => Unit): MultiParquetChecker = {
def addParquetFile(block: (ParquetWriter[Group], MessageType) => Unit): MultiParquetChecker = {
val path = addFile()
val writer = getParquetWriter(path, parquetSchema, true)
block(writer, parquetSchema)
writer.close()
Expand Down

0 comments on commit 91755ff

Please sign in to comment.