From 71706978a2df988d20e3204732be3b2e735feab7 Mon Sep 17 00:00:00 2001 From: "milos.colic" Date: Fri, 13 Oct 2023 11:14:15 +0100 Subject: [PATCH] Remove redundant UUID definitions. Remove Murmur hashing of UUIDs. Add a check for tests to warning prints. Fix data types in MergeAgg and RasterExpressionSerialization. --- .../mosaic/core/raster/MosaicRaster.scala | 2 -- .../mosaic/core/raster/api/RasterAPI.scala | 4 ++- .../raster/gdal_raster/MosaicRasterGDAL.scala | 27 +++++++------------ .../mosaic/core/raster/operator/NDVI.scala | 2 +- .../operator/clip/RasterClipByVector.scala | 2 +- .../raster/operator/merge/MergeBands.scala | 10 +++---- .../raster/operator/merge/MergeRasters.scala | 7 +++-- .../raster/operator/proj/RasterProject.scala | 2 +- .../operator/retile/OverlappingTiles.scala | 3 +-- .../core/raster/operator/retile/ReTile.scala | 3 +-- .../expressions/raster/RST_MergeAgg.scala | 2 +- .../base/RasterExpressionSerialization.scala | 4 +-- .../labs/mosaic/functions/MosaicContext.scala | 3 ++- .../labs/mosaic/utils/PathUtils.scala | 3 ++- 14 files changed, 31 insertions(+), 43 deletions(-) diff --git a/src/main/scala/com/databricks/labs/mosaic/core/raster/MosaicRaster.scala b/src/main/scala/com/databricks/labs/mosaic/core/raster/MosaicRaster.scala index 996006beb..02b8b4a53 100644 --- a/src/main/scala/com/databricks/labs/mosaic/core/raster/MosaicRaster.scala +++ b/src/main/scala/com/databricks/labs/mosaic/core/raster/MosaicRaster.scala @@ -50,8 +50,6 @@ abstract class MosaicRaster( def getDimensions: (Int, Int) - def uuid: Long - def getExtension: String def getPath: String diff --git a/src/main/scala/com/databricks/labs/mosaic/core/raster/api/RasterAPI.scala b/src/main/scala/com/databricks/labs/mosaic/core/raster/api/RasterAPI.scala index bd56093ba..c6023f55a 100644 --- a/src/main/scala/com/databricks/labs/mosaic/core/raster/api/RasterAPI.scala +++ b/src/main/scala/com/databricks/labs/mosaic/core/raster/api/RasterAPI.scala @@ -7,6 +7,8 @@ import org.apache.spark.sql.types.{BinaryType, DataType, StringType} import org.apache.spark.unsafe.types.UTF8String import org.gdal.gdal.gdal +import java.util.UUID + /** * A base trait for all Raster API's. * @param reader @@ -45,7 +47,7 @@ abstract class RasterAPI(reader: RasterReader) extends Serializable { rasterDT match { case StringType => val extension = raster.getRaster.GetDriver().GetMetadataItem("DMD_EXTENSION") - val writePath = s"$checkpointPath/${raster.uuid}.$extension" + val writePath = s"$checkpointPath/${UUID.randomUUID()}.$extension" val outPath = raster.writeToPath(writePath) RasterCleaner.dispose(raster) UTF8String.fromString(outPath) diff --git a/src/main/scala/com/databricks/labs/mosaic/core/raster/gdal_raster/MosaicRasterGDAL.scala b/src/main/scala/com/databricks/labs/mosaic/core/raster/gdal_raster/MosaicRasterGDAL.scala index 339ed2ee1..82b80f426 100644 --- a/src/main/scala/com/databricks/labs/mosaic/core/raster/gdal_raster/MosaicRasterGDAL.scala +++ b/src/main/scala/com/databricks/labs/mosaic/core/raster/gdal_raster/MosaicRasterGDAL.scala @@ -8,7 +8,6 @@ import com.databricks.labs.mosaic.core.raster._ import com.databricks.labs.mosaic.core.raster.operator.clip.RasterClipByVector import com.databricks.labs.mosaic.core.types.model.GeometryTypeEnum.POLYGON import com.databricks.labs.mosaic.utils.PathUtils -import org.apache.orc.util.Murmur3 import org.gdal.gdal.gdal.GDALInfo import org.gdal.gdal.{Dataset, InfoOptions, gdal} import org.gdal.gdalconst.gdalconstConstants._ @@ -24,7 +23,6 @@ import scala.util.Try /** GDAL implementation of the MosaicRaster trait. */ //noinspection DuplicatedCode class MosaicRasterGDAL( - _uuid: Long, var raster: Dataset, path: String, isTemp: Boolean, @@ -211,7 +209,7 @@ class MosaicRasterGDAL( override def getMemSize: Long = { if (memSize == -1) { if (PathUtils.isInMemory(path)) { - val tempPath = PathUtils.createTmpFilePath(this.uuid.toString, getExtension) + val tempPath = PathUtils.createTmpFilePath(getExtension) writeToPath(tempPath) this.refresh() val size = Files.size(Paths.get(tempPath)) @@ -260,7 +258,7 @@ class MosaicRasterGDAL( if (PathUtils.isInMemory(path)) { // Create a temporary directory to store the raster // This is needed because Files cannot read from /vsimem/ directly - val path = PathUtils.createTmpFilePath(uuid.toString, getExtension) + val path = PathUtils.createTmpFilePath(getExtension) writeToPath(path, destroy) val byteArray = Files.readAllBytes(Paths.get(path)) Files.delete(Paths.get(path)) @@ -290,8 +288,6 @@ class MosaicRasterGDAL( this.raster = openRaster(path) } - override def uuid: Long = _uuid - override def getExtension: String = { val driver = gdal.GetDriverByName(driverShortName) val extension = driver.GetMetadataItem("DMD_EXTENSION") @@ -320,7 +316,7 @@ class MosaicRasterGDAL( } override def asTemp: MosaicRaster = { - val temp = PathUtils.createTmpFilePath(uuid.toString, getExtension) + val temp = PathUtils.createTmpFilePath(getExtension) writeToPath(temp) if (PathUtils.isInMemory(path)) RasterCleaner.dispose(this) else this.destroy() @@ -375,15 +371,13 @@ object MosaicRasterGDAL extends RasterReader { driverShortName: String, memSize: Long ): MosaicRasterGDAL = { - val uuid = Murmur3.hash64(path.getBytes()) - val raster = new MosaicRasterGDAL(uuid, dataset, path, isTemp, parentPath, driverShortName, memSize) + val raster = new MosaicRasterGDAL(dataset, path, isTemp, parentPath, driverShortName, memSize) raster } def apply(path: String, isTemp: Boolean, parentPath: String, driverShortName: String, memSize: Long): MosaicRasterGDAL = { - val uuid = Murmur3.hash64(path.getBytes()) val dataset = openRaster(path, driverShortName) - val raster = new MosaicRasterGDAL(uuid, dataset, path, isTemp, parentPath, driverShortName, memSize) + val raster = new MosaicRasterGDAL(dataset, path, isTemp, parentPath, driverShortName, memSize) raster } @@ -403,8 +397,6 @@ object MosaicRasterGDAL extends RasterReader { val isSubdataset = PathUtils.isSubdataset(inPath) val localCopy = if (readDirect) inPath else PathUtils.copyToTmp(inPath) val path = PathUtils.getCleanPath(localCopy, localCopy.endsWith(".zip")) - - val uuid = Murmur3.hash64(path.getBytes()) val readPath = if (isSubdataset) PathUtils.getSubdatasetPath(path) else PathUtils.getZipPath(path) @@ -414,18 +406,17 @@ object MosaicRasterGDAL extends RasterReader { // It will be available when the raster is serialized for next operation // If value is needed then it will be computed when getMemSize is called // We cannot just use memSize value of the parent due to the fact that the raster could be a subdataset - val raster = new MosaicRasterGDAL(uuid, dataset, path, true, parentPath, driverShortName, -1) + val raster = new MosaicRasterGDAL(dataset, path, true, parentPath, driverShortName, -1) raster } override def readRaster(contentBytes: Array[Byte], parentPath: String, driverShortName: String): MosaicRaster = { if (Option(contentBytes).isEmpty || contentBytes.isEmpty) { - // Handle empty rasters, easy check to -1L as UUID for empty rasters - new MosaicRasterGDAL(-1L, null, "", false, parentPath, "", -1) + new MosaicRasterGDAL(null, "", false, parentPath, "", -1) } else { // This is a temp UUID for purposes of reading the raster through GDAL from memory // The stable UUID is kept in metadata of the raster - val uuid = Murmur3.hash64(UUID.randomUUID().toString.getBytes()) + val uuid = UUID.randomUUID().toString val extension = GDAL.getExtension(driverShortName) val virtualPath = s"/vsimem/$uuid.$extension" gdal.FileFromMemBuffer(virtualPath, contentBytes) @@ -438,7 +429,7 @@ object MosaicRasterGDAL extends RasterReader { gdal.FileFromMemBuffer(virtualPath, contentBytes) openRaster(zippedPath, driverShortName) }) - val raster = new MosaicRasterGDAL(uuid, dataset, virtualPath, false, parentPath, driverShortName, contentBytes.length) + val raster = new MosaicRasterGDAL(dataset, virtualPath, false, parentPath, driverShortName, contentBytes.length) raster } } diff --git a/src/main/scala/com/databricks/labs/mosaic/core/raster/operator/NDVI.scala b/src/main/scala/com/databricks/labs/mosaic/core/raster/operator/NDVI.scala index fbe51624c..3b99b7196 100644 --- a/src/main/scala/com/databricks/labs/mosaic/core/raster/operator/NDVI.scala +++ b/src/main/scala/com/databricks/labs/mosaic/core/raster/operator/NDVI.scala @@ -28,7 +28,7 @@ object NDVI { val numLines = redBand.GetYSize val lineSize = redBand.GetXSize - val ndviPath = PathUtils.createTmpFilePath(raster.uuid.toString, raster.getExtension) + val ndviPath = PathUtils.createTmpFilePath(raster.getExtension) val ndviRaster = newNDVIRaster(raster, ndviPath) var outputLine: Array[Double] = null diff --git a/src/main/scala/com/databricks/labs/mosaic/core/raster/operator/clip/RasterClipByVector.scala b/src/main/scala/com/databricks/labs/mosaic/core/raster/operator/clip/RasterClipByVector.scala index 19f45e01c..e4257038a 100644 --- a/src/main/scala/com/databricks/labs/mosaic/core/raster/operator/clip/RasterClipByVector.scala +++ b/src/main/scala/com/databricks/labs/mosaic/core/raster/operator/clip/RasterClipByVector.scala @@ -14,7 +14,7 @@ object RasterClipByVector { val rasterCRS = raster.getRaster.GetSpatialRef() val outShortName = raster.getRaster.GetDriver().getShortName - val resultFileName = PathUtils.createTmpFilePath(raster.uuid.toString, raster.getExtension) + val resultFileName = PathUtils.createTmpFilePath(raster.getExtension) val shapeFileName = VectorClipper.generateClipper(geometry, geomCRS, rasterCRS, geometryAPI) diff --git a/src/main/scala/com/databricks/labs/mosaic/core/raster/operator/merge/MergeBands.scala b/src/main/scala/com/databricks/labs/mosaic/core/raster/operator/merge/MergeBands.scala index a4825c588..04710b4aa 100644 --- a/src/main/scala/com/databricks/labs/mosaic/core/raster/operator/merge/MergeBands.scala +++ b/src/main/scala/com/databricks/labs/mosaic/core/raster/operator/merge/MergeBands.scala @@ -7,11 +7,10 @@ import com.databricks.labs.mosaic.utils.PathUtils object MergeBands { def merge(rasters: Seq[MosaicRaster], resampling: String): MosaicRaster = { - val rasterUUID = java.util.UUID.randomUUID.toString val outShortName = rasters.head.getRaster.GetDriver.getShortName - val vrtPath = PathUtils.createTmpFilePath(rasterUUID, "vrt") - val rasterPath = PathUtils.createTmpFilePath(rasterUUID, "tif") + val vrtPath = PathUtils.createTmpFilePath("vrt") + val rasterPath = PathUtils.createTmpFilePath("tif") val vrtRaster = GDALBuildVRT.executeVRT( vrtPath, @@ -31,11 +30,10 @@ object MergeBands { } def merge(rasters: Seq[MosaicRaster], pixel: (Double, Double), resampling: String): MosaicRaster = { - val rasterUUID = java.util.UUID.randomUUID.toString val outShortName = rasters.head.getRaster.GetDriver.getShortName - val vrtPath = PathUtils.createTmpFilePath(rasterUUID, "vrt") - val rasterPath = PathUtils.createTmpFilePath(rasterUUID, "tif") + val vrtPath = PathUtils.createTmpFilePath("vrt") + val rasterPath = PathUtils.createTmpFilePath("tif") val vrtRaster = GDALBuildVRT.executeVRT( vrtPath, diff --git a/src/main/scala/com/databricks/labs/mosaic/core/raster/operator/merge/MergeRasters.scala b/src/main/scala/com/databricks/labs/mosaic/core/raster/operator/merge/MergeRasters.scala index a8507ec52..e7c962db2 100644 --- a/src/main/scala/com/databricks/labs/mosaic/core/raster/operator/merge/MergeRasters.scala +++ b/src/main/scala/com/databricks/labs/mosaic/core/raster/operator/merge/MergeRasters.scala @@ -8,20 +8,19 @@ import org.gdal.gdal.gdal object MergeRasters { def merge(rasters: Seq[MosaicRaster]): MosaicRaster = { - val rasterUUID = java.util.UUID.randomUUID.toString val outShortName = rasters.head.getDriversShortName val extension = gdal.GetDriverByName(outShortName).GetMetadataItem("DMD_EXTENSION") - val rasterPath = PathUtils.createTmpFilePath(rasterUUID, extension) + val rasterPath = PathUtils.createTmpFilePath(extension) - val result2 = GDALWarp.executeWarp( + val result = GDALWarp.executeWarp( rasterPath, isTemp = true, rasters, command = s"gdalwarp -r bilinear -of $outShortName -co COMPRESS=PACKBITS -overwrite" ) - result2 + result } } diff --git a/src/main/scala/com/databricks/labs/mosaic/core/raster/operator/proj/RasterProject.scala b/src/main/scala/com/databricks/labs/mosaic/core/raster/operator/proj/RasterProject.scala index 2a0d60bc5..c1022028b 100644 --- a/src/main/scala/com/databricks/labs/mosaic/core/raster/operator/proj/RasterProject.scala +++ b/src/main/scala/com/databricks/labs/mosaic/core/raster/operator/proj/RasterProject.scala @@ -10,7 +10,7 @@ object RasterProject { def project(raster: MosaicRaster, destCRS: SpatialReference): MosaicRaster = { val outShortName = raster.getRaster.GetDriver().getShortName - val resultFileName = PathUtils.createTmpFilePath(raster.uuid.toString, raster.getExtension) + val resultFileName = PathUtils.createTmpFilePath(raster.getExtension) // Note that Null is the right value here val authName = destCRS.GetAuthorityName(null) diff --git a/src/main/scala/com/databricks/labs/mosaic/core/raster/operator/retile/OverlappingTiles.scala b/src/main/scala/com/databricks/labs/mosaic/core/raster/operator/retile/OverlappingTiles.scala index f5d146a7d..b1539a2fc 100644 --- a/src/main/scala/com/databricks/labs/mosaic/core/raster/operator/retile/OverlappingTiles.scala +++ b/src/main/scala/com/databricks/labs/mosaic/core/raster/operator/retile/OverlappingTiles.scala @@ -26,9 +26,8 @@ object OverlappingTiles { val width = Math.min(tileWidth, xSize - i) + 1 val height = Math.min(tileHeight, ySize - j) + 1 - val uuid = java.util.UUID.randomUUID.toString val fileExtension = raster.getExtension - val rasterPath = PathUtils.createTmpFilePath(uuid, fileExtension) + val rasterPath = PathUtils.createTmpFilePath(fileExtension) val shortName = raster.getRaster.GetDriver.getShortName val result = GDALTranslate.executeTranslate( diff --git a/src/main/scala/com/databricks/labs/mosaic/core/raster/operator/retile/ReTile.scala b/src/main/scala/com/databricks/labs/mosaic/core/raster/operator/retile/ReTile.scala index 9b988635f..59d990441 100644 --- a/src/main/scala/com/databricks/labs/mosaic/core/raster/operator/retile/ReTile.scala +++ b/src/main/scala/com/databricks/labs/mosaic/core/raster/operator/retile/ReTile.scala @@ -22,9 +22,8 @@ object ReTile { val xMin = if (x == 0) x * tileWidth else x * tileWidth - 1 val yMin = if (y == 0) y * tileHeight else y * tileHeight - 1 - val rasterUUID = java.util.UUID.randomUUID.toString val fileExtension = raster.getRasterFileExtension - val rasterPath = PathUtils.createTmpFilePath(rasterUUID, fileExtension) + val rasterPath = PathUtils.createTmpFilePath(fileExtension) val shortDriver = raster.getRaster.GetDriver().getShortName val result = GDALTranslate.executeTranslate( diff --git a/src/main/scala/com/databricks/labs/mosaic/expressions/raster/RST_MergeAgg.scala b/src/main/scala/com/databricks/labs/mosaic/expressions/raster/RST_MergeAgg.scala index e530038f8..e117bfb21 100644 --- a/src/main/scala/com/databricks/labs/mosaic/expressions/raster/RST_MergeAgg.scala +++ b/src/main/scala/com/databricks/labs/mosaic/expressions/raster/RST_MergeAgg.scala @@ -34,7 +34,7 @@ case class RST_MergeAgg( override lazy val deterministic: Boolean = true override val child: Expression = rasterExpr override val nullable: Boolean = false - override val dataType: DataType = RasterTileType(LongType) + override val dataType: DataType = RasterTileType(expressionConfig.getCellIdType) override def prettyName: String = "rst_merge_agg" val rasterAPI: RasterAPI = RasterAPI.apply(expressionConfig.getRasterAPI) diff --git a/src/main/scala/com/databricks/labs/mosaic/expressions/raster/base/RasterExpressionSerialization.scala b/src/main/scala/com/databricks/labs/mosaic/expressions/raster/base/RasterExpressionSerialization.scala index 4e10ca001..bbfa9a163 100644 --- a/src/main/scala/com/databricks/labs/mosaic/expressions/raster/base/RasterExpressionSerialization.scala +++ b/src/main/scala/com/databricks/labs/mosaic/expressions/raster/base/RasterExpressionSerialization.scala @@ -6,7 +6,7 @@ import com.databricks.labs.mosaic.core.raster.gdal_raster.RasterCleaner import com.databricks.labs.mosaic.core.types.RasterTileType import com.databricks.labs.mosaic.core.types.model.MosaicRasterTile import com.databricks.labs.mosaic.functions.MosaicExpressionConfig -import org.apache.spark.sql.types.DataType +import org.apache.spark.sql.types.{DataType, StructType} trait RasterExpressionSerialization { @@ -20,7 +20,7 @@ trait RasterExpressionSerialization { if (returnsRaster) { val tile = data.asInstanceOf[MosaicRasterTile] val checkpoint = expressionConfig.getRasterCheckpoint - val rasterType = outputDataType.asInstanceOf[RasterTileType].rasterType + val rasterType = outputDataType.asInstanceOf[StructType].fields(1).dataType val result = tile .formatCellId(IndexSystemFactory.getIndexSystem(expressionConfig.getIndexSystem)) .serialize(rasterAPI, rasterType, checkpoint) diff --git a/src/main/scala/com/databricks/labs/mosaic/functions/MosaicContext.scala b/src/main/scala/com/databricks/labs/mosaic/functions/MosaicContext.scala index d1b7cf4eb..b8869d1e6 100644 --- a/src/main/scala/com/databricks/labs/mosaic/functions/MosaicContext.scala +++ b/src/main/scala/com/databricks/labs/mosaic/functions/MosaicContext.scala @@ -989,8 +989,9 @@ object MosaicContext extends Logging { val sparkVersion = spark.conf.get("spark.databricks.clusterUsageTags.sparkVersion", "") val isML = sparkVersion.contains("-ml-") val isPhoton = spark.conf.getOption("spark.databricks.photon.enabled").getOrElse("false").toBoolean + val isTest = spark.conf.getOption("spark.databricks.clusterUsageTags.clusterType").getOrElse("true").toBoolean - if (!isML && !isPhoton) { + if (!isML && !isPhoton && !isTest) { // Print out the warnings both to the log and to the console logWarning("DEPRECATION WARNING: Mosaic is not supported on the selected Databricks Runtime") logWarning("DEPRECATION WARNING: Mosaic will stop working on this cluster after v0.3.x.") diff --git a/src/main/scala/com/databricks/labs/mosaic/utils/PathUtils.scala b/src/main/scala/com/databricks/labs/mosaic/utils/PathUtils.scala index 3cfc0d968..db041e944 100644 --- a/src/main/scala/com/databricks/labs/mosaic/utils/PathUtils.scala +++ b/src/main/scala/com/databricks/labs/mosaic/utils/PathUtils.scala @@ -88,8 +88,9 @@ object PathUtils { } } - def createTmpFilePath(uuid: String, extension: String): String = { + def createTmpFilePath(extension: String): String = { val randomID = UUID.randomUUID() + val uuid = UUID.randomUUID().toString val tmpDir = Files.createTempDirectory(s"mosaic_$randomID").toFile.getAbsolutePath val outPath = s"$tmpDir/raster_${uuid.replace("-", "_")}.$extension" Files.createDirectories(Paths.get(outPath).getParent)