Skip to content

Commit

Permalink
Fix NDVI and missing pixel issues.
Browse files Browse the repository at this point in the history
  • Loading branch information
milos.colic committed Sep 26, 2023
1 parent c3025bc commit ddab8cc
Show file tree
Hide file tree
Showing 12 changed files with 159 additions and 41 deletions.
22 changes: 19 additions & 3 deletions notebooks/prototypes/grid_tiles/01 Gridded EOD Data.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,8 @@
tiles_df = catalog_df\
.repartition(200, F.rand())\
.withColumn("raster", mos.rst_subdivide("outputfile", F.lit(8)))\
.withColumn("size", mos.rst_memsize("raster"))
.withColumn("size", mos.rst_memsize("raster"))\
.where(~mos.rst_isempty("raster"))

# COMMAND ----------

Expand All @@ -78,7 +79,7 @@

# COMMAND ----------

library.plot_raster(to_plot[12]["raster"])
library.plot_raster(to_plot[7]["raster"])

# COMMAND ----------

Expand All @@ -98,13 +99,24 @@

# COMMAND ----------

library.plot_raster(to_plot[18]["raster"]["raster"])
library.plot_raster(to_plot[2]["raster"]["raster"])

# COMMAND ----------

spark.read.table("alaska_b08")\
.withColumn("souce_band", F.col("asset.name"))\
.repartition(200, F.rand())\
.where(F.expr("rst_tryopen(outputfile)"))\
.withColumn("raster", mos.rst_subdivide("outputfile", F.lit(8)))\

.display()

# COMMAND ----------

def index_band(band_table, resolution):
catalog_df = \
spark.read.table(band_table)\
.where(F.expr("rst_tryopen(outputfile)"))\
.withColumn("souce_band", F.col("asset.name"))

tiles_df = catalog_df\
Expand Down Expand Up @@ -139,6 +151,10 @@ def index_band(band_table, resolution):

# COMMAND ----------

index_band("alaska_b08", 6)

# COMMAND ----------

grid_tessellate_df = spark.read.table("alaska_b02_indexed")
grid_tessellate_df.limit(20).display()

Expand Down
37 changes: 14 additions & 23 deletions notebooks/prototypes/grid_tiles/03 Band Stacking.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@
.withColumn("h3", F.col("raster.index_id"))
df_b04 = spark.read.table("alaska_b04_indexed")\
.withColumn("h3", F.col("raster.index_id"))
df_b08 = spark.read.table("alaska_b08_indexed")\
.withColumn("h3", F.col("raster.index_id"))

# COMMAND ----------

Expand Down Expand Up @@ -97,6 +99,9 @@
df_b04_resolved = df_b04.groupBy("h3", "date")\
.agg(mos.rst_merge(F.collect_list("raster.raster")).alias("raster"))

df_b08_resolved = df_b08.groupBy("h3", "date")\
.agg(mos.rst_merge(F.collect_list("raster.raster")).alias("raster"))


# COMMAND ----------

Expand Down Expand Up @@ -124,7 +129,13 @@
.withColumnRenamed("raster", "b04"),
on = ["h3", "date"]
)\
.withColumn("raster", mos.rst_mergebands(F.array("b04", "b03", "b02"))) # b04 = red b03 = blue b02 = green b08 = nir
.join(
df_b08_resolved\
.repartition(200, F.rand())\
.withColumnRenamed("raster", "b08"),
on = ["h3", "date"]
)\
.withColumn("raster", mos.rst_mergebands(F.array("b04", "b03", "b02", "b08"))) # b04 = red b03 = blue b02 = green b08 = nir

# COMMAND ----------

Expand All @@ -137,7 +148,7 @@
# COMMAND ----------

ndvi_test = stacked_df.withColumn(
"ndvi", mos.rst_ndvi("raster", F.lit(1), F.lit(2))
"ndvi", mos.rst_ndvi("raster", F.lit(4), F.lit(1))
)

# COMMAND ----------
Expand All @@ -146,28 +157,8 @@

# COMMAND ----------

library.plot_raster(to_plot[12]["test"])
library.plot_raster(to_plot[4]["ndvi"])

# COMMAND ----------

stacked_df.createOrReplaceTempView("multiband")

# COMMAND ----------

to_plot = stacked_df.limit(50).collect()

# COMMAND ----------

library.plot_raster(to_plot[0]["b04"])

# COMMAND ----------

library.plot_raster(to_plot[0]["b03"])

# COMMAND ----------

library.plot_raster(to_plot[2]["b02"])

# COMMAND ----------

library.plot_raster(to_plot[2]["ndvi"])
Binary file not shown.
21 changes: 21 additions & 0 deletions python/mosaic/api/raster.py
Original file line number Diff line number Diff line change
Expand Up @@ -769,6 +769,27 @@ def rst_tessellate(raster: ColumnOrName, resolution: ColumnOrName) -> Column:
)


def rst_tryopen(raster: ColumnOrName) -> Column:
"""
Tries to open the raster and returns a flag indicating if the raster can be opened.
Parameters
----------
raster : Column (StringType)
Path to the raster file.
Returns
-------
Column (BooleanType)
Whether the raster can be opened.
"""
return config.mosaic_context.invoke_function(
"rst_tryopen",
pyspark_to_java_column(raster)
)


def rst_subdivide(raster: ColumnOrName, size_in_mb: ColumnOrName) -> Column:
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,9 @@ abstract class MosaicRaster(
/** @return Returns the y size of the raster. */
def ySize: Int

/** @return Returns the diagonal size of the raster. */
def diagSize: Double = math.sqrt(xSize * xSize + ySize * ySize)

/** @return Returns the bandId-th Band from the raster. */
def getBand(bandId: Int): MosaicRasterBand

Expand All @@ -88,6 +91,33 @@ abstract class MosaicRaster(
/** @return Returns the path to the raster file. */
def getGeoTransform: Array[Double]

/** @return Returns pixel x size. */
def pixelXSize: Double = getGeoTransform(1)

/** @return Returns pixel y size. */
def pixelYSize: Double = getGeoTransform(5)

/** @return Returns the origin x coordinate. */
def originX: Double = getGeoTransform(0)

/** @return Returns the origin y coordinate. */
def originY: Double = getGeoTransform(3)

/** @return Returns the max x coordinate. */
def xMax: Double = originX + xSize * pixelXSize

/** @return Returns the max y coordinate. */
def yMax: Double = originY + ySize * pixelYSize

/** @return Returns the min x coordinate. */
def xMin: Double = originX

/** @return Returns the min y coordinate. */
def yMin: Double = originY

/** @return Returns the diagonal size of a pixel. */
def pixelDiagSize: Double = math.sqrt(pixelXSize * pixelXSize + pixelYSize * pixelYSize)

/** @return Returns the GDAL Dataset representing the raster. */
def getRaster: Dataset

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,10 @@ class MosaicRasterGDAL(_uuid: Long, var raster: Dataset, path: String, isTemp: B

override def getRasterForCell(cellID: Long, indexSystem: IndexSystem, geometryAPI: GeometryAPI): MosaicRaster = {
val cellGeom = indexSystem.indexToGeometry(cellID, geometryAPI)
// buffer by diagonal size of the raster pixel to avoid clipping issues
// add 1% to avoid rounding errors
val bufferR = pixelDiagSize*1.01
val bufferedCell = cellGeom.buffer(bufferR)
val geomCRS =
if (cellGeom.getSpatialReference == 0) wsg84
else {
Expand All @@ -164,7 +168,7 @@ class MosaicRasterGDAL(_uuid: Long, var raster: Dataset, path: String, isTemp: B
geomCRS.SetAxisMappingStrategy(osr.osrConstants.OAMS_TRADITIONAL_GIS_ORDER)
geomCRS
}
RasterClipByVector.clip(this, cellGeom, geomCRS, geometryAPI)
RasterClipByVector.clip(this, bufferedCell, geomCRS, geometryAPI)
}

/**
Expand All @@ -180,7 +184,11 @@ class MosaicRasterGDAL(_uuid: Long, var raster: Dataset, path: String, isTemp: B
gdal.Unlink(path)
}
if (isTemp) {
Files.delete(Paths.get(path))
try {
Files.delete(Paths.get(path))
} catch {
case _: Throwable => ()
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,12 @@ import com.databricks.labs.mosaic.utils.PathUtils

object NDVI {

def emptyCopy(raster: MosaicRaster, path: String): MosaicRaster = {
val doubleDataType: Int = org.gdal.gdalconst.gdalconstConstants.GDT_Float64

def newNDVIRaster(raster: MosaicRaster, path: String): MosaicRaster = {
val driver = raster.getRaster.GetDriver()
val newRaster = driver.Create(path, raster.xSize, raster.ySize, raster.numBands, raster.getRaster.GetRasterBand(1).getDataType)
// NDVI is always a single band raster with double data type
val newRaster = driver.Create(path, raster.xSize, raster.ySize, 1, doubleDataType)
newRaster.SetGeoTransform(raster.getRaster.GetGeoTransform)
newRaster.SetProjection(raster.getRaster.GetProjection)
MosaicRasterGDAL(newRaster, path, isTemp = true)
Expand All @@ -23,25 +26,25 @@ object NDVI {
val lineSize = redBand.GetXSize

val ndviPath = PathUtils.createTmpFilePath(raster.uuid.toString, raster.getExtension)
val ndviRaster = emptyCopy(raster, ndviPath)
val ndviRaster = newNDVIRaster(raster, ndviPath)

var outputLine: Array[Double] = null
var redScanline: Array[Double] = null
var nirScanline: Array[Double] = null
val dataType = org.gdal.gdalconst.gdalconstConstants.GDT_Float64

for (line <- Range(0, numLines)) {
redScanline = Array.fill[Double](lineSize)(0.0)
nirScanline = Array.fill[Double](lineSize)(0.0)
redBand.ReadRaster(0, line, lineSize, 1, dataType, redScanline)
nirBand.ReadRaster(0, line, lineSize, 1, dataType, nirScanline)
redBand.ReadRaster(0, line, lineSize, 1, doubleDataType, redScanline)
nirBand.ReadRaster(0, line, lineSize, 1, doubleDataType, nirScanline)

outputLine = redScanline.zip(nirScanline).map { case (red, nir) =>
if (red + nir == 0) 0.0
else (nir - red) / (red + nir)
}
ndviRaster.getRaster
.GetRasterBand(1)
.WriteRaster(0, line, lineSize, 1, dataType, outputLine.array)
.WriteRaster(0, line, lineSize, 1, doubleDataType, outputLine.array)
}
outputLine = null
redScanline = null
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,6 @@ import com.databricks.labs.mosaic.core.types.model.MosaicRasterChip
object RasterTessellate {

def tessellate(raster: MosaicRaster, resolution: Int, indexSystem: IndexSystem, geometryAPI: GeometryAPI): Seq[MosaicRasterChip] = {

Array.fill(10)(1.0)

val indexSR = indexSystem.osrSpatialRef
val bbox = raster.bbox(geometryAPI, indexSR)
val cells = Mosaic.mosaicFill(bbox, resolution, keepCoreGeom = false, indexSystem, geometryAPI)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,11 @@ object ReTile {
val bbox = geometryAPI.createBbox(xMin, yMin, xMin + tileWidth, yMin + tileHeight)
.mapXY((x, y) => rasterAPI.toWorldCoord(raster.getGeoTransform, x.toInt, y.toInt))

RasterClipByVector.clip(raster, bbox, raster.getRaster.GetSpatialRef(), geometryAPI)
// buffer bbox by the diagonal size of the raster to ensure we get all the pixels in the tile
val bufferR = raster.pixelDiagSize * 1.01
val bufferedBBox = bbox.buffer(bufferR)

RasterClipByVector.clip(raster, bufferedBBox, raster.getRaster.GetSpatialRef(), geometryAPI)

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ case class RST_IsEmpty(raster: Expression, expressionConfig: MosaicExpressionCon

/** Returns true if the raster is empty. */
override def rasterTransform(raster: MosaicRaster): Any = {
val result = raster.ySize == 0 && raster.xSize == 0
val result = (raster.ySize == 0 && raster.xSize == 0) || raster.isEmpty
RasterCleaner.dispose(raster)
result
}
Expand All @@ -40,7 +40,7 @@ object RST_IsEmpty extends WithExpressionInfo {
| """.stripMargin

override def builder(expressionConfig: MosaicExpressionConfig): FunctionBuilder = {
GenericExpressionFactory.getBaseBuilder[RST_IsEmpty](2, expressionConfig)
GenericExpressionFactory.getBaseBuilder[RST_IsEmpty](1, expressionConfig)
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package com.databricks.labs.mosaic.expressions.raster

import com.databricks.labs.mosaic.core.raster.MosaicRaster
import com.databricks.labs.mosaic.core.raster.gdal_raster.RasterCleaner
import com.databricks.labs.mosaic.expressions.base.{GenericExpressionFactory, WithExpressionInfo}
import com.databricks.labs.mosaic.expressions.raster.base.RasterExpression
import com.databricks.labs.mosaic.functions.MosaicExpressionConfig
import org.apache.spark.sql.catalyst.analysis.FunctionRegistry.FunctionBuilder
import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback
import org.apache.spark.sql.catalyst.expressions.{Expression, NullIntolerant}
import org.apache.spark.sql.types._

/** Returns true if the raster is empty. */
case class RST_TryOpen(raster: Expression, expressionConfig: MosaicExpressionConfig)
extends RasterExpression[RST_TryOpen](raster, BooleanType, returnsRaster = false, expressionConfig)
with NullIntolerant
with CodegenFallback {

/** Returns true if the raster can be opened. */
override def rasterTransform(raster: MosaicRaster): Any = {
val result = Option(raster.getRaster).isDefined
RasterCleaner.dispose(raster)
result
}

}

/** Expression info required for the expression registration for spark SQL. */
object RST_TryOpen extends WithExpressionInfo {

override def name: String = "rst_tryopen"

override def usage: String = "_FUNC_(expr1) - Returns true if the raster can be opened."

override def example: String =
"""
| Examples:
| > SELECT _FUNC_(a);
| false
| """.stripMargin

override def builder(expressionConfig: MosaicExpressionConfig): FunctionBuilder = {
GenericExpressionFactory.getBaseBuilder[RST_TryOpen](1, expressionConfig)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,7 @@ class MosaicContext(indexSystem: IndexSystem, geometryAPI: GeometryAPI, rasterAP
mosaicRegistry.registerExpression[RST_Subdatasets](expressionConfig)
mosaicRegistry.registerExpression[RST_Summary](expressionConfig)
mosaicRegistry.registerExpression[RST_Tessellate](expressionConfig)
mosaicRegistry.registerExpression[RST_TryOpen](expressionConfig)
mosaicRegistry.registerExpression[RST_Subdivide](expressionConfig)
mosaicRegistry.registerExpression[RST_UpperLeftX](expressionConfig)
mosaicRegistry.registerExpression[RST_UpperLeftY](expressionConfig)
Expand Down Expand Up @@ -699,6 +700,7 @@ class MosaicContext(indexSystem: IndexSystem, geometryAPI: GeometryAPI, rasterAP
ColumnAdapter(RST_Tessellate(col(raster).expr, resolution.expr, expressionConfig))
def rst_tessellate(raster: Column, resolution: Int): Column =
ColumnAdapter(RST_Tessellate(raster.expr, lit(resolution).expr, expressionConfig))
def rst_tryopen(raster: Column): Column = ColumnAdapter(RST_TryOpen(raster.expr, expressionConfig))
def rst_subdivide(raster: Column, sizeInMB: Column): Column =
ColumnAdapter(RST_Subdivide(raster.expr, sizeInMB.expr, expressionConfig))
def rst_subdivide(raster: Column, sizeInMB: Int): Column =
Expand Down

0 comments on commit ddab8cc

Please sign in to comment.