diff --git a/src/main/scala/datasources/PdfPartitionedFileUtil.scala b/src/main/scala/datasources/PdfPartitionedFileUtil.scala index 702c34a..d746ed6 100644 --- a/src/main/scala/datasources/PdfPartitionedFileUtil.scala +++ b/src/main/scala/datasources/PdfPartitionedFileUtil.scala @@ -1,15 +1,11 @@ package com.stabrise.sparkpdf package datasources -//import scala.reflect.runtime.universe -//import scala.reflect.runtime.universe._ - import org.apache.hadoop.fs.{BlockLocation, FileStatus, LocatedFileStatus, Path} import org.apache.pdfbox.pdmodel.PDDocument import org.apache.spark.paths.SparkPath import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.execution.PartitionedFileUtil import org.apache.spark.sql.execution.datasources.PartitionedFile import org.apache.spark.sql.execution.datasources.FileStatusWithMetadata @@ -23,30 +19,24 @@ object PdfPartitionedFileUtil { partitionValues: InternalRow): Seq[PartitionedFile] = { val path = filePath val fs = path.getFileSystem(sparkSession.sessionState.newHadoopConf()) - val status = fs.getFileStatus(filePath) // Load the PDF document val document = PDDocument.load(fs.open(file.getPath)) val page_num = document.getNumberOfPages document.close() - //println("Page number scan: " + page_num) + (0L until page_num by maxSplitBytes).map { offset => val remaining = page_num - offset val size = if (remaining > maxSplitBytes) maxSplitBytes else remaining val hosts = getBlockHosts(getBlockLocations(file), offset, size) - //val hosts = PartitionedFileUtil.getBlockHosts(PartitionedFileUtil.getBlockLocations(file), offset, size) - //val h = hosts.asInstanceOf[scala.collection.immutable.Iterable[String]] -// typeOf[PartitionedFile].members.filter(!_.isMethod).filter(_.name.toTermName.toString == "locations").foreach { i => -// println(i.name, i.typeSignature) -// } - println(partitionValues) - println(SparkPath.fromPath(filePath)) - println(SparkPath.fromPath(file.getPath)) - println(offset) - println(size) - println(page_num) - PartitionedFile(partitionValues=partitionValues, filePath=SparkPath.fromPath(file.getPath), start=offset, length = size, - modificationTime=file.getModificationTime, fileSize=page_num.toLong) + PartitionedFile( + partitionValues=partitionValues, + filePath=SparkPath.fromPath(file.getPath), + start=offset, + length=size, + locations=hosts, + modificationTime=file.getModificationTime, + fileSize=page_num.toLong) } }