Skip to content

Commit

Permalink
Remove redundant UUID definitions.
Browse files Browse the repository at this point in the history
Remove Murmur hashing of UUIDs.
Add a check for tests to warning prints.
Fix data types in MergeAgg and RasterExpressionSerialization.
  • Loading branch information
milos.colic committed Oct 13, 2023
1 parent bef9f04 commit 7170697
Show file tree
Hide file tree
Showing 14 changed files with 31 additions and 43 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,6 @@ abstract class MosaicRaster(

def getDimensions: (Int, Int)

def uuid: Long

def getExtension: String

def getPath: String
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import org.apache.spark.sql.types.{BinaryType, DataType, StringType}
import org.apache.spark.unsafe.types.UTF8String
import org.gdal.gdal.gdal

import java.util.UUID

/**
* A base trait for all Raster API's.
* @param reader
Expand Down Expand Up @@ -45,7 +47,7 @@ abstract class RasterAPI(reader: RasterReader) extends Serializable {
rasterDT match {
case StringType =>
val extension = raster.getRaster.GetDriver().GetMetadataItem("DMD_EXTENSION")
val writePath = s"$checkpointPath/${raster.uuid}.$extension"
val writePath = s"$checkpointPath/${UUID.randomUUID()}.$extension"
val outPath = raster.writeToPath(writePath)
RasterCleaner.dispose(raster)
UTF8String.fromString(outPath)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import com.databricks.labs.mosaic.core.raster._
import com.databricks.labs.mosaic.core.raster.operator.clip.RasterClipByVector
import com.databricks.labs.mosaic.core.types.model.GeometryTypeEnum.POLYGON
import com.databricks.labs.mosaic.utils.PathUtils
import org.apache.orc.util.Murmur3
import org.gdal.gdal.gdal.GDALInfo
import org.gdal.gdal.{Dataset, InfoOptions, gdal}
import org.gdal.gdalconst.gdalconstConstants._
Expand All @@ -24,7 +23,6 @@ import scala.util.Try
/** GDAL implementation of the MosaicRaster trait. */
//noinspection DuplicatedCode
class MosaicRasterGDAL(
_uuid: Long,
var raster: Dataset,
path: String,
isTemp: Boolean,
Expand Down Expand Up @@ -211,7 +209,7 @@ class MosaicRasterGDAL(
override def getMemSize: Long = {
if (memSize == -1) {
if (PathUtils.isInMemory(path)) {
val tempPath = PathUtils.createTmpFilePath(this.uuid.toString, getExtension)
val tempPath = PathUtils.createTmpFilePath(getExtension)
writeToPath(tempPath)
this.refresh()
val size = Files.size(Paths.get(tempPath))
Expand Down Expand Up @@ -260,7 +258,7 @@ class MosaicRasterGDAL(
if (PathUtils.isInMemory(path)) {
// Create a temporary directory to store the raster
// This is needed because Files cannot read from /vsimem/ directly
val path = PathUtils.createTmpFilePath(uuid.toString, getExtension)
val path = PathUtils.createTmpFilePath(getExtension)
writeToPath(path, destroy)
val byteArray = Files.readAllBytes(Paths.get(path))
Files.delete(Paths.get(path))
Expand Down Expand Up @@ -290,8 +288,6 @@ class MosaicRasterGDAL(
this.raster = openRaster(path)
}

override def uuid: Long = _uuid

override def getExtension: String = {
val driver = gdal.GetDriverByName(driverShortName)
val extension = driver.GetMetadataItem("DMD_EXTENSION")
Expand Down Expand Up @@ -320,7 +316,7 @@ class MosaicRasterGDAL(
}

override def asTemp: MosaicRaster = {
val temp = PathUtils.createTmpFilePath(uuid.toString, getExtension)
val temp = PathUtils.createTmpFilePath(getExtension)
writeToPath(temp)
if (PathUtils.isInMemory(path)) RasterCleaner.dispose(this)
else this.destroy()
Expand Down Expand Up @@ -375,15 +371,13 @@ object MosaicRasterGDAL extends RasterReader {
driverShortName: String,
memSize: Long
): MosaicRasterGDAL = {
val uuid = Murmur3.hash64(path.getBytes())
val raster = new MosaicRasterGDAL(uuid, dataset, path, isTemp, parentPath, driverShortName, memSize)
val raster = new MosaicRasterGDAL(dataset, path, isTemp, parentPath, driverShortName, memSize)
raster
}

def apply(path: String, isTemp: Boolean, parentPath: String, driverShortName: String, memSize: Long): MosaicRasterGDAL = {
val uuid = Murmur3.hash64(path.getBytes())
val dataset = openRaster(path, driverShortName)
val raster = new MosaicRasterGDAL(uuid, dataset, path, isTemp, parentPath, driverShortName, memSize)
val raster = new MosaicRasterGDAL(dataset, path, isTemp, parentPath, driverShortName, memSize)
raster
}

Expand All @@ -403,8 +397,6 @@ object MosaicRasterGDAL extends RasterReader {
val isSubdataset = PathUtils.isSubdataset(inPath)
val localCopy = if (readDirect) inPath else PathUtils.copyToTmp(inPath)
val path = PathUtils.getCleanPath(localCopy, localCopy.endsWith(".zip"))

val uuid = Murmur3.hash64(path.getBytes())
val readPath =
if (isSubdataset) PathUtils.getSubdatasetPath(path)
else PathUtils.getZipPath(path)
Expand All @@ -414,18 +406,17 @@ object MosaicRasterGDAL extends RasterReader {
// It will be available when the raster is serialized for next operation
// If value is needed then it will be computed when getMemSize is called
// We cannot just use memSize value of the parent due to the fact that the raster could be a subdataset
val raster = new MosaicRasterGDAL(uuid, dataset, path, true, parentPath, driverShortName, -1)
val raster = new MosaicRasterGDAL(dataset, path, true, parentPath, driverShortName, -1)
raster
}

override def readRaster(contentBytes: Array[Byte], parentPath: String, driverShortName: String): MosaicRaster = {
if (Option(contentBytes).isEmpty || contentBytes.isEmpty) {
// Handle empty rasters, easy check to -1L as UUID for empty rasters
new MosaicRasterGDAL(-1L, null, "", false, parentPath, "", -1)
new MosaicRasterGDAL(null, "", false, parentPath, "", -1)
} else {
// This is a temp UUID for purposes of reading the raster through GDAL from memory
// The stable UUID is kept in metadata of the raster
val uuid = Murmur3.hash64(UUID.randomUUID().toString.getBytes())
val uuid = UUID.randomUUID().toString
val extension = GDAL.getExtension(driverShortName)
val virtualPath = s"/vsimem/$uuid.$extension"
gdal.FileFromMemBuffer(virtualPath, contentBytes)
Expand All @@ -438,7 +429,7 @@ object MosaicRasterGDAL extends RasterReader {
gdal.FileFromMemBuffer(virtualPath, contentBytes)
openRaster(zippedPath, driverShortName)
})
val raster = new MosaicRasterGDAL(uuid, dataset, virtualPath, false, parentPath, driverShortName, contentBytes.length)
val raster = new MosaicRasterGDAL(dataset, virtualPath, false, parentPath, driverShortName, contentBytes.length)
raster
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ object NDVI {
val numLines = redBand.GetYSize
val lineSize = redBand.GetXSize

val ndviPath = PathUtils.createTmpFilePath(raster.uuid.toString, raster.getExtension)
val ndviPath = PathUtils.createTmpFilePath(raster.getExtension)
val ndviRaster = newNDVIRaster(raster, ndviPath)

var outputLine: Array[Double] = null
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ object RasterClipByVector {
val rasterCRS = raster.getRaster.GetSpatialRef()
val outShortName = raster.getRaster.GetDriver().getShortName

val resultFileName = PathUtils.createTmpFilePath(raster.uuid.toString, raster.getExtension)
val resultFileName = PathUtils.createTmpFilePath(raster.getExtension)

val shapeFileName = VectorClipper.generateClipper(geometry, geomCRS, rasterCRS, geometryAPI)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,10 @@ import com.databricks.labs.mosaic.utils.PathUtils
object MergeBands {

def merge(rasters: Seq[MosaicRaster], resampling: String): MosaicRaster = {
val rasterUUID = java.util.UUID.randomUUID.toString
val outShortName = rasters.head.getRaster.GetDriver.getShortName

val vrtPath = PathUtils.createTmpFilePath(rasterUUID, "vrt")
val rasterPath = PathUtils.createTmpFilePath(rasterUUID, "tif")
val vrtPath = PathUtils.createTmpFilePath("vrt")
val rasterPath = PathUtils.createTmpFilePath("tif")

val vrtRaster = GDALBuildVRT.executeVRT(
vrtPath,
Expand All @@ -31,11 +30,10 @@ object MergeBands {
}

def merge(rasters: Seq[MosaicRaster], pixel: (Double, Double), resampling: String): MosaicRaster = {
val rasterUUID = java.util.UUID.randomUUID.toString
val outShortName = rasters.head.getRaster.GetDriver.getShortName

val vrtPath = PathUtils.createTmpFilePath(rasterUUID, "vrt")
val rasterPath = PathUtils.createTmpFilePath(rasterUUID, "tif")
val vrtPath = PathUtils.createTmpFilePath("vrt")
val rasterPath = PathUtils.createTmpFilePath("tif")

val vrtRaster = GDALBuildVRT.executeVRT(
vrtPath,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,20 +8,19 @@ import org.gdal.gdal.gdal
object MergeRasters {

def merge(rasters: Seq[MosaicRaster]): MosaicRaster = {
val rasterUUID = java.util.UUID.randomUUID.toString
val outShortName = rasters.head.getDriversShortName
val extension = gdal.GetDriverByName(outShortName).GetMetadataItem("DMD_EXTENSION")

val rasterPath = PathUtils.createTmpFilePath(rasterUUID, extension)
val rasterPath = PathUtils.createTmpFilePath(extension)

val result2 = GDALWarp.executeWarp(
val result = GDALWarp.executeWarp(
rasterPath,
isTemp = true,
rasters,
command = s"gdalwarp -r bilinear -of $outShortName -co COMPRESS=PACKBITS -overwrite"
)

result2
result
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ object RasterProject {
def project(raster: MosaicRaster, destCRS: SpatialReference): MosaicRaster = {
val outShortName = raster.getRaster.GetDriver().getShortName

val resultFileName = PathUtils.createTmpFilePath(raster.uuid.toString, raster.getExtension)
val resultFileName = PathUtils.createTmpFilePath(raster.getExtension)

// Note that Null is the right value here
val authName = destCRS.GetAuthorityName(null)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,8 @@ object OverlappingTiles {
val width = Math.min(tileWidth, xSize - i) + 1
val height = Math.min(tileHeight, ySize - j) + 1

val uuid = java.util.UUID.randomUUID.toString
val fileExtension = raster.getExtension
val rasterPath = PathUtils.createTmpFilePath(uuid, fileExtension)
val rasterPath = PathUtils.createTmpFilePath(fileExtension)
val shortName = raster.getRaster.GetDriver.getShortName

val result = GDALTranslate.executeTranslate(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,8 @@ object ReTile {
val xMin = if (x == 0) x * tileWidth else x * tileWidth - 1
val yMin = if (y == 0) y * tileHeight else y * tileHeight - 1

val rasterUUID = java.util.UUID.randomUUID.toString
val fileExtension = raster.getRasterFileExtension
val rasterPath = PathUtils.createTmpFilePath(rasterUUID, fileExtension)
val rasterPath = PathUtils.createTmpFilePath(fileExtension)
val shortDriver = raster.getRaster.GetDriver().getShortName

val result = GDALTranslate.executeTranslate(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ case class RST_MergeAgg(
override lazy val deterministic: Boolean = true
override val child: Expression = rasterExpr
override val nullable: Boolean = false
override val dataType: DataType = RasterTileType(LongType)
override val dataType: DataType = RasterTileType(expressionConfig.getCellIdType)
override def prettyName: String = "rst_merge_agg"

val rasterAPI: RasterAPI = RasterAPI.apply(expressionConfig.getRasterAPI)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import com.databricks.labs.mosaic.core.raster.gdal_raster.RasterCleaner
import com.databricks.labs.mosaic.core.types.RasterTileType
import com.databricks.labs.mosaic.core.types.model.MosaicRasterTile
import com.databricks.labs.mosaic.functions.MosaicExpressionConfig
import org.apache.spark.sql.types.DataType
import org.apache.spark.sql.types.{DataType, StructType}

trait RasterExpressionSerialization {

Expand All @@ -20,7 +20,7 @@ trait RasterExpressionSerialization {
if (returnsRaster) {
val tile = data.asInstanceOf[MosaicRasterTile]
val checkpoint = expressionConfig.getRasterCheckpoint
val rasterType = outputDataType.asInstanceOf[RasterTileType].rasterType
val rasterType = outputDataType.asInstanceOf[StructType].fields(1).dataType
val result = tile
.formatCellId(IndexSystemFactory.getIndexSystem(expressionConfig.getIndexSystem))
.serialize(rasterAPI, rasterType, checkpoint)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -989,8 +989,9 @@ object MosaicContext extends Logging {
val sparkVersion = spark.conf.get("spark.databricks.clusterUsageTags.sparkVersion", "")
val isML = sparkVersion.contains("-ml-")
val isPhoton = spark.conf.getOption("spark.databricks.photon.enabled").getOrElse("false").toBoolean
val isTest = spark.conf.getOption("spark.databricks.clusterUsageTags.clusterType").getOrElse("true").toBoolean

if (!isML && !isPhoton) {
if (!isML && !isPhoton && !isTest) {
// Print out the warnings both to the log and to the console
logWarning("DEPRECATION WARNING: Mosaic is not supported on the selected Databricks Runtime")
logWarning("DEPRECATION WARNING: Mosaic will stop working on this cluster after v0.3.x.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,9 @@ object PathUtils {
}
}

def createTmpFilePath(uuid: String, extension: String): String = {
def createTmpFilePath(extension: String): String = {
val randomID = UUID.randomUUID()
val uuid = UUID.randomUUID().toString
val tmpDir = Files.createTempDirectory(s"mosaic_$randomID").toFile.getAbsolutePath
val outPath = s"$tmpDir/raster_${uuid.replace("-", "_")}.$extension"
Files.createDirectories(Paths.get(outPath).getParent)
Expand Down

0 comments on commit 7170697

Please sign in to comment.