Skip to content

Commit

Permalink
Updated PdfPartitionedFileUtil
Browse files Browse the repository at this point in the history
  • Loading branch information
mykolamelnykml committed Dec 3, 2024
1 parent 9bbc75b commit 4e6e4d6
Showing 1 changed file with 9 additions and 19 deletions.
28 changes: 9 additions & 19 deletions src/main/scala/datasources/PdfPartitionedFileUtil.scala
Original file line number Diff line number Diff line change
@@ -1,15 +1,11 @@
package com.stabrise.sparkpdf
package datasources

//import scala.reflect.runtime.universe
//import scala.reflect.runtime.universe._

import org.apache.hadoop.fs.{BlockLocation, FileStatus, LocatedFileStatus, Path}
import org.apache.pdfbox.pdmodel.PDDocument
import org.apache.spark.paths.SparkPath
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.execution.PartitionedFileUtil
import org.apache.spark.sql.execution.datasources.PartitionedFile
import org.apache.spark.sql.execution.datasources.FileStatusWithMetadata

Expand All @@ -23,30 +19,24 @@ object PdfPartitionedFileUtil {
partitionValues: InternalRow): Seq[PartitionedFile] = {
val path = filePath
val fs = path.getFileSystem(sparkSession.sessionState.newHadoopConf())
val status = fs.getFileStatus(filePath)

// Load the PDF document
val document = PDDocument.load(fs.open(file.getPath))
val page_num = document.getNumberOfPages
document.close()
//println("Page number scan: " + page_num)

(0L until page_num by maxSplitBytes).map { offset =>
val remaining = page_num - offset
val size = if (remaining > maxSplitBytes) maxSplitBytes else remaining
val hosts = getBlockHosts(getBlockLocations(file), offset, size)
//val hosts = PartitionedFileUtil.getBlockHosts(PartitionedFileUtil.getBlockLocations(file), offset, size)
//val h = hosts.asInstanceOf[scala.collection.immutable.Iterable[String]]
// typeOf[PartitionedFile].members.filter(!_.isMethod).filter(_.name.toTermName.toString == "locations").foreach { i =>
// println(i.name, i.typeSignature)
// }
println(partitionValues)
println(SparkPath.fromPath(filePath))
println(SparkPath.fromPath(file.getPath))
println(offset)
println(size)
println(page_num)
PartitionedFile(partitionValues=partitionValues, filePath=SparkPath.fromPath(file.getPath), start=offset, length = size,
modificationTime=file.getModificationTime, fileSize=page_num.toLong)
PartitionedFile(
partitionValues=partitionValues,
filePath=SparkPath.fromPath(file.getPath),
start=offset,
length=size,
locations=hosts,
modificationTime=file.getModificationTime,
fileSize=page_num.toLong)
}
}

Expand Down

0 comments on commit 4e6e4d6

Please sign in to comment.