Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GDAL UDF example in docs #579

Merged
merged 1 commit into from
Sep 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/source/api/api.rst
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,4 @@ API Documentation
spatial-predicates
spatial-aggregations
raster-functions
rasterio-udfs
rasterio-gdal-udfs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
=====================
Rasterio UDFs
Rasterio + GDAL UDFs
=====================


Expand All @@ -12,7 +12,8 @@ It is a great library for working with raster data in Python and it is a popular
Rasterio UDFs provide a way to use Rasterio Python API in Spark for distributed processing of raster data.
The data structures used by Mosaic are compatible with Rasterio and can be used interchangeably.
In this section we will show how to use Rasterio UDFs to process raster data in Mosaic + Spark.
We assume that you have a basic understanding of Rasterio and GDAL.
We assume that you have a basic understanding of Rasterio and GDAL. We also provide an example which directly calls GDAL
Translate and Warp.

Please note that we advise the users to set these configuration to ensure proper distribution.

Expand All @@ -22,7 +23,7 @@ Please note that we advise the users to set these configuration to ensure proper
spark.conf.set("spark.sql.execution.arrow.maxRecordsPerBatch", "1024")
spark.conf.set("spark.sql.execution.arrow.fallback.enabled", "true")
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "false")
spark.conf.set("spark.sql.shuffle.partitions", "400")
spark.conf.set("spark.sql.shuffle.partitions", "400") # maybe higher, depending


Rasterio raster plotting
Expand Down Expand Up @@ -238,7 +239,7 @@ Next we will define a function that will write a given raster file to disk. A "g
not want to have a file context manager open when you go to write out its context as the context manager will not yet
have been flushed. Another "gotcha" might be that the raster dataset does not have CRS included; if this arises, we
recommend adjusting the function to specify the CRS and set it on the dst variable, more at
`rasterio.crs <https://rasterio.readthedocs.io/en/stable/api/rasterio.crs.html>`_. We would also point out that notional
`rasterio.crs <https://rasterio.readthedocs.io/en/stable/api/rasterio.crs.html>`__. We would also point out that notional
"file_id" param can be constructed as a repeatable name from other field(s) in your dataframe / table or be random,
depending on your needs.

Expand Down Expand Up @@ -355,3 +356,77 @@ Finally we will apply the function to the DataFrame.
| /dbfs/path/to/output/dir/3215.tif |
| ... |
+-------------------------------------------+


UDF example for generating Google Maps compatible tiles
#######################################################

Delta Tables can be used as the basis for serving pre-generated tiles as an option. Here is an example UDF that applies
a few gdal operations on each band, to write to Google Maps Compatible tiles transformed into 3857 (Web Mercator). Note:
the 'quadbin' column shown in this example was generated separately using CARTO's `quadbin <https://pypi.org/project/quadbin/>`__
package. You can replace the calls with whatever you need to do. The output structure looks something like the following:

.. figure:: ../images/rasterio/quadbin.png
:figclass: doc-figure

The UDF example sets raster extent, block size, and interpolation. It specifies source SRID as 4326;
additionally, output type and nodata values are specified. COG overviews are not generated
nor is an ALPHA band, but they could be. Again, you would modify this example to suit your needs.

.. code-block:: python

@udf("binary")
def transform_raw_raster(raster):
import tempfile
import uuid
from osgeo import gdal

with tempfile.TemporaryDirectory() as tmp_dir:
fn1 = f"{tmp_dir}/{uuid.uuid4().hex}.tif"
fn2 = f"{tmp_dir}/{uuid.uuid4().hex}.tif"
fn3 = f"{tmp_dir}/{uuid.uuid4().hex}.tif"
fn4 = f"{tmp_dir}/{uuid.uuid4().hex}.tif"

with open(fn1, "wb") as f:
f.write(raster)

gdal.Translate(fn2, fn1, options="-of GTiff -a_ullr -180 90 180 -90 -a_nodata -32767 -ot Int16")
gdal.Warp(fn3, fn2, options= "-tr 0.125 -0.125 -r cubicspline")
gdal.Warp(fn4, fn3, options= "-of COG -co BLOCKSIZE=1024 -co TILING_SCHEME=GoogleMapsCompatible -co COMPRESS=DEFLATE -co OVERVIEWS=NONE -co ADD_ALPHA=NO -co RESAMPLING=cubicspline -s_srs EPSG:4326")

with open(fn4, "rb") as f:
res = f.read()
return res

Example of calling the UDF (original data was NetCDF). If you have more than 1 band, this assumes :code:`transform_raw_rasters` UDF is called after
:code:`rst_separatebands` function (or you could potentially modify the UDF to operate on multiple bands).

.. code-block:: python

base_table = (
df
.select(
"path",
"metadata",
"tile"
)
.withColumn("subdatasets", mos.rst_subdatasets("tile"))
.where(F.array_contains(F.map_values("subdatasets"), "sfcWind"))
.withColumn("tile", mos.rst_getsubdataset("tile", F.lit("sfcWind")))
.withColumn("tile", mos.rst_separatebands("tile"))
.repartition(sc.defaultParallelism)
.withColumn(
"tile",
F.col("tile")
.withField("raster", transform_raw_raster("tile.raster"))
.withField(
"metadata",
F.map_concat("tile.metadata", F.create_map(F.lit("driver"), F.lit("GTiff")))
)
)
.withColumn("srid", mos.rst_srid("tile"))
.withColumn("srid", F.when(F.col("srid") == F.lit(0), F.lit(4326)).otherwise(F.col("srid")))
.withColumn("timestep", F.element_at(mos.rst_metadata("tile"), "NC_GLOBAL#GDAL_MOSAIC_BAND_INDEX"))
.withColumn("tile", mos.rst_transform("tile", F.lit(3857)))
.repartition(sc.defaultParallelism, "timestep")
)
Binary file added docs/source/images/rasterio/quadbin.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading