diff --git a/notebooks/prototypes/grid_tiles/00 Download STACs.py b/notebooks/prototypes/grid_tiles/00 Download STACs.py new file mode 100644 index 000000000..879dfff85 --- /dev/null +++ b/notebooks/prototypes/grid_tiles/00 Download STACs.py @@ -0,0 +1,155 @@ +# Databricks notebook source +# MAGIC %md +# MAGIC ## Install the libraries and prepare the environment + +# COMMAND ---------- + +# MAGIC %md +# MAGIC For this demo we will require a few spatial libraries that can be easily installed via pip install. We will be using gdal, rasterio, pystac and databricks-mosaic for data download and data manipulation. We will use planetary computer as the source of the raster data for the analysis. + +# COMMAND ---------- + +# MAGIC %pip install databricks-mosaic rasterio==1.3.5 --quiet gdal==3.4.3 pystac pystac_client planetary_computer + +# COMMAND ---------- + +import library +import pystac_client +import planetary_computer +import mosaic as mos + +from pyspark.sql import functions as F + +mos.enable_mosaic(spark, dbutils) +mos.enable_gdal(spark) + +# COMMAND ---------- + +spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "false") + +# COMMAND ---------- + +# MAGIC %md +# MAGIC We have selected an area near Seatle for this demo, this is just an illustrative choice, the code will work with an area anywhere on the surface of the planet. Our solution provides an easy way to tesselate the area into indexed pieces. This tessellation will allow us to parallelise the download of the data. + +# COMMAND ---------- + +cells = library.generate_cells((-123, 47, -122, 48), 8, spark, mos) +cells.display() + +# COMMAND ---------- + +to_display = cells.select("grid.wkb") + +# COMMAND ---------- + +# MAGIC %%mosaic_kepler +# MAGIC to_display wkb geometry 200000 + +# COMMAND ---------- + +# MAGIC %md +# MAGIC It is fairly easy to interface with the pysta_client and a remote raster data catalogs. We can browse resource collections and individual assets. + +# COMMAND ---------- + +catalog = pystac_client.Client.open( + "https://planetarycomputer.microsoft.com/api/stac/v1", + modifier=planetary_computer.sign_inplace, +) + +# COMMAND ---------- + +collections = list(catalog.get_collections()) +collections + +# COMMAND ---------- + +time_range = "2020-12-01/2020-12-31" +bbox = [-123, 47, -122, 48] + +search = catalog.search(collections=["landsat-c2-l2"], bbox=bbox, datetime=time_range) +items = search.item_collection() +items + +# COMMAND ---------- + +[json.dumps(item.to_dict()) for item in items] + +# COMMAND ---------- + +cell_jsons = cells.select( + F.hash("geom").alias("area_id"), + F.col("grid.index_id").alias("h3"), + mos.st_asgeojson("grid.wkb").alias("geojson") +) + +# COMMAND ---------- + +# MAGIC %md +# MAGIC Stac catalogs support easy download for area of interest provided as geojsons. With this in mind we will convert all our H3 cells of interest into geojsons and prepare stac requests. + +# COMMAND ---------- + +cell_jsons.display() + +# COMMAND ---------- + +# MAGIC %md +# MAGIC Our framework allows for easy preparation of stac requests with only one line of code. This data is delta ready as this point and can easily be stored for lineage purposes. + +# COMMAND ---------- + +eod_items = library.get_assets_for_cells(cell_jsons.limit(200)) +eod_items.display() + +# COMMAND ---------- + +# MAGIC %md +# MAGIC From this point we can easily extract the download links for items of interest. + +# COMMAND ---------- + +to_download = library.get_unique_hrefs(eod_items) +to_download.display() + +# COMMAND ---------- + +# MAGIC %fs mkdirs /FileStore/geospatial/odin/dais23demo + +# COMMAND ---------- + +catalof_df = to_download\ + .withColumn("outputfile", library.download_asset("href", F.lit("/dbfs/FileStore/geospatial/odin/dais23demo"), F.concat(F.hash(F.rand()), F.lit("tif")))) + +# COMMAND ---------- + +catalof_df.write.format("delta").saveAsTable("mosaic_odin_files") + +# COMMAND ---------- + +# MAGIC %md +# MAGIC We have now dowloaded all the tile os interest and we can browse them from our delta table. + +# COMMAND ---------- + +catalof_df = spark.read.table("mosaic_odin_files") + +# COMMAND ---------- + +catalof_df.display() + +# COMMAND ---------- + +import rasterio +from matplotlib import pyplot +from rasterio.plot import show + +fig, ax = pyplot.subplots(1, figsize=(12, 12)) +raster = rasterio.open("""/dbfs/FileStore/geospatial/odin/dais23demo/1219604474tif""") +show(raster, ax=ax, cmap='Greens') +pyplot.show() + +# COMMAND ---------- + + diff --git a/notebooks/prototypes/grid_tiles/01 Gridded EOD Data.py b/notebooks/prototypes/grid_tiles/01 Gridded EOD Data.py new file mode 100644 index 000000000..0f11d9863 --- /dev/null +++ b/notebooks/prototypes/grid_tiles/01 Gridded EOD Data.py @@ -0,0 +1,253 @@ +# Databricks notebook source +# MAGIC %md +# MAGIC ## Install the libraries and prepare the environment + +# COMMAND ---------- + +# MAGIC %pip install rasterio==1.3.5 --quiet gdal==3.4.3 pystac pystac_client planetary_computer + +# COMMAND ---------- + +# check if copy shared objects is really needed + +import library +import mosaic as mos +import rasterio + +from io import BytesIO +from matplotlib import pyplot +from rasterio.io import MemoryFile +from pyspark.sql import functions as F + +mos.enable_mosaic(spark, dbutils) +mos.enable_gdal(spark) + +# COMMAND ---------- + +# MAGIC %reload_ext autoreload +# MAGIC %autoreload 2 +# MAGIC %reload_ext library + +# COMMAND ---------- + +spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "false") + +# COMMAND ---------- + +# MAGIC %md +# MAGIC ## Data load + +# COMMAND ---------- + +# MAGIC %md +# MAGIC We can easily browse the data we have downloaded in the notebook 00. The download metadata is stored as a delta table. + +# COMMAND ---------- + +catalog_df = spark.read.table("mosaic_odin_files") +catalog_df.display() + +# COMMAND ---------- + +# MAGIC %md +# MAGIC For the purpose of raster data analysis mosaic framework provides a distributed gdal data readers. +# MAGIC We can also retile the images on read to make sure the imagery is balanced and more parallelised. + +# COMMAND ---------- + +tiles_df = catalog_df\ + .withColumn("raster", mos.rst_subdivide("outputfile", F.lit(16)))\ + .withColumn("size", mos.rst_memsize("raster")) + +# COMMAND ---------- + +tiles_df.limit(50).display() + +# COMMAND ---------- + +# MAGIC %md +# MAGIC At this point all our imagery is held in memory, but we can easily access it and visualise it. + +# COMMAND ---------- + +to_plot = tiles_df.limit(50).collect() + +# COMMAND ---------- + +library.plot_raster(to_plot[42]["raster"]) + +# COMMAND ---------- + +# MAGIC %md +# MAGIC Mosaic framework provides the same tessellation principles for both vector and raster data. We can project both vector and raster data into a unified grid and from there it is very easy to combine and join raster to raster, vector to vector and raster to vector data. + +# COMMAND ---------- + +grid_tessellate_df = tiles_df\ + .withColumn("raster", mos.rst_tessellate("raster", F.lit(6)))\ + .withColumn("index_id", F.col("raster.index_id")) + +to_plot = grid_tessellate_df.limit(50).collect() + +# COMMAND ---------- + +library.plot_raster(to_plot[15]["raster"]["raster"]) + +# COMMAND ---------- + +grid_tessellate_df2 = tiles_df\ + .repartition(200)\ + .withColumn("raster", mos.rst_tessellate("raster", F.lit(6)))\ + .withColumn("index_id", F.col("raster.index_id")) + +grid_tessellate_df2.write.mode("overwrite").format("delta").save("dbfs:/FileStore/geospatial/odin/dais23demo/tessellated2") + +# COMMAND ---------- + +to_plot = grid_tessellate_df2.limit(50).collect() + +# COMMAND ---------- + +library.plot_raster(to_plot[6]["raster"]["raster"]) + +# COMMAND ---------- + +grid_tessellate_df.write.mode("overwrite").format("delta").save("dbfs:/FileStore/geospatial/odin/dais23demo/indexed") +grid_tessellate_df = spark.read.format("delta").load("dbfs:/FileStore/geospatial/odin/dais23demo/indexed") + +# COMMAND ---------- + +grid_tessellate_df.display() + +# COMMAND ---------- + +# MAGIC %md +# MAGIC ## Raster for arbitrary corridors. + +# COMMAND ---------- + +# MAGIC %md +# MAGIC To illustrate how easy is to combine vector and raster data we will use a traditionally hard problem. Extraction of raster data for an arbitrary corridors. + +# COMMAND ---------- + +line_example = "LINESTRING(-122.2163001236001 47.77530703528161,-122.1503821548501 47.51996083856245,-121.8867102798501 47.62743233444236,-122.0954505142251 47.360200479212935,-121.8152991470376 47.41970286748326,-121.5131751236001 47.360200479212935,-121.7603675064126 47.23726461439514,-122.2547522720376 47.0691640536914,-121.9361487564126 47.08038730142549,-121.3813391861001 47.10282670591806,-121.2110511001626 47.31925361681828,-120.9308997329751 47.56816499155946,-120.7661048111001 47.41226874260139,-121.1616126236001 47.11404286281199,-121.7933264907876 46.885516358226546,-122.3206702407876 46.79909683431514)" + +line_df = spark.createDataFrame([line_example], "string")\ + .select(F.col("value").alias("wkt"))\ + .select( + mos.grid_tessellateexplode("wkt", F.lit(6)).alias("grid") + )\ + .select("grid.*") + +# COMMAND ---------- + +# MAGIC %md +# MAGIC We can visualise all the cells of interest for the provided arbitrary corridor. Since we are now operating in grid space it is very easy to get all raster images that match this specification. + +# COMMAND ---------- + +# MAGIC %%mosaic_kepler +# MAGIC line_df index_id h3 + +# COMMAND ---------- + +cells_of_interest = grid_tessellate_df.repartition(40, F.rand()).join(line_df, on=["index_id"]) + +# COMMAND ---------- + +cells_of_interest.display() + +# COMMAND ---------- + +# MAGIC %md +# MAGIC Our framework provides a very easy way to provide rasterio lambda functions that we can distribute and scale up without any involvement from the end user. + +# COMMAND ---------- + +src = rasterio.open("/dbfs/FileStore/geospatial/odin/dais23demo/1805789896tif") +avg = src.statistics(bidx = 1).mean +avg + +# COMMAND ---------- + +def mean_band_1(dataset): + try: + return dataset.statistics(bidx = 1).mean + except: + return 0.0 + +with_measurement = cells_of_interest.withColumn( + "rasterio_lambda", library.rasterio_lambda("raster.raster", lambda dataset: mean_band_1(dataset) ) +) + +# COMMAND ---------- + +with_measurement.write.format("delta").mode("overwrite").option("overwriteSchema", "true").saveAsTable("mosaic_odin_gridded") + +# COMMAND ---------- + +# MAGIC %md +# MAGIC ## Raster to Timeseries projection + +# COMMAND ---------- + +# MAGIC %md +# MAGIC With this power of expression that rasterio provides and power of distribution that mosaic provides we can easily convert rasters to numerical values with arbitrary mathematical complexity. Since all of our imagery is timestamped, our raster to number projection in effect is creating time series bound to H3 cells. + +# COMMAND ---------- + +with_measurement = spark.read.table("mosaic_odin_gridded") + +# COMMAND ---------- + +with_measurement.where("rasterio_lambda > 0").display() + +# COMMAND ---------- + +measurements = with_measurement\ + .select( + "index_id", + "date", + "rasterio_lambda", + "wkb" + )\ + .where("rasterio_lambda > 0")\ + .groupBy("index_id", "date")\ + .agg( + F.avg("rasterio_lambda").alias("measure"), + F.first("wkb").alias("wkb") + ) + +# COMMAND ---------- + +# MAGIC %md +# MAGIC At this point our data is effectively became timeseries data and can be modeled as virtual IOT devices that are fixed in spatial context. + +# COMMAND ---------- + +measurements.display() + +# COMMAND ---------- + +# MAGIC %md +# MAGIC We can easily visualise data for individual dates in a spatial contex by leveraging our H3 locations. + +# COMMAND ---------- + +df_12_13 = measurements.where("date == '2020-12-13'") +df_12_05 = measurements.where("date == '2020-12-05'") + +# COMMAND ---------- + +# MAGIC %%mosaic_kepler +# MAGIC df_12_13 index_id h3 5000 + +# COMMAND ---------- + +# MAGIC %%mosaic_kepler +# MAGIC df_12_05 index_id h3 5000 + +# COMMAND ---------- + + diff --git a/notebooks/prototypes/grid_tiles/databricks_mosaic-0.3.11-py3-none-any.whl b/notebooks/prototypes/grid_tiles/databricks_mosaic-0.3.11-py3-none-any.whl new file mode 100644 index 000000000..12aa723e7 Binary files /dev/null and b/notebooks/prototypes/grid_tiles/databricks_mosaic-0.3.11-py3-none-any.whl differ diff --git a/notebooks/prototypes/grid_tiles/library.py b/notebooks/prototypes/grid_tiles/library.py new file mode 100644 index 000000000..39e014129 --- /dev/null +++ b/notebooks/prototypes/grid_tiles/library.py @@ -0,0 +1,114 @@ +import shapely.geometry +import mosaic as mos +import pystac_client +import planetary_computer +import json +import requests +import rasterio +from io import BytesIO +from matplotlib import pyplot +from rasterio.io import MemoryFile +import rasterio +from matplotlib import pyplot +from rasterio.plot import show + +from pyspark.sql.types import * +from pyspark.sql import functions as F +from pyspark.sql.functions import udf + +def generate_cells(extent, resolution, spark, mos): + polygon = shapely.geometry.box(*extent, ccw=True) + wkt_poly = str(polygon.wkt) + cells = spark.createDataFrame([[wkt_poly]], ["geom"]) + cells = cells.withColumn("grid", mos.grid_tessellateexplode("geom", F.lit(resolution))) + return cells + +@udf("array") +def get_items(geojson, datetime, collections): + catalog = pystac_client.Client.open( + "https://planetarycomputer.microsoft.com/api/stac/v1", + modifier=planetary_computer.sign_inplace, + ) + search = catalog.search( + collections=collections, + intersects=geojson, + datetime=datetime + ) + items = search.item_collection() + return [json.dumps(item.to_dict()) for item in items] + +@udf("array") +def get_assets(item): + item_dict = json.loads(item) + assets = item_dict["assets"] + return [json.dumps({**{"name": asset}, **assets[asset]}) for asset in assets] + + +def get_assets_for_cells(cells_df): + return cells_df\ + .repartition(200, F.rand())\ + .withColumn("items", get_items("geojson", F.lit("2020-12-01/2020-12-31"), F.array(F.lit("landsat-c2-l2"))))\ + .repartition(200, F.rand())\ + .withColumn("items", F.explode("items"))\ + .withColumn("assets", get_assets("items"))\ + .repartition(200, F.rand())\ + .withColumn("assets", F.explode("assets"))\ + .withColumn("asset", F.from_json(F.col("assets"), MapType(StringType(), StringType())))\ + .withColumn("item", F.from_json(F.col("items"), MapType(StringType(), StringType())))\ + .withColumn("item_properties", F.from_json("item.properties", MapType(StringType(), StringType())))\ + .withColumn("item_collection", F.col("item.collection"))\ + .withColumn("item_bbox", F.col("item.bbox"))\ + .withColumn("item_id", F.col("item.id"))\ + .withColumn("stac_version", F.col("item.stac_version"))\ + .drop("assets", "items", "item")\ + .repartition(200, F.rand()) + +def get_unique_hrefs(assets_df): + return assets_df.select( + "area_id", + "h3", + "asset.name", + "asset.href", + "item_id", + F.to_date("item_properties.datetime").alias("date") + ).where( + "name == 'swir22'" + ).groupBy("href", "item_id", "date")\ + .agg(F.first("h3").alias("h3")) + +@udf("string") +def download_asset(href, dir_path, filename): + try: + outpath = f"{dir_path}/{filename}" + # Make the actual request, set the timeout for no data to 10 seconds and enable streaming responses so we don't have to keep the large files in memory + request = requests.get(href, timeout=10, stream=True) + + # Open the output file and make sure we write in binary mode + with open(outpath, 'wb') as fh: + # Walk through the request response in chunks of 1024 * 1024 bytes, so 1MiB + for chunk in request.iter_content(1024 * 1024): + # Write the chunk to the file + fh.write(chunk) + # Optionally we can check here if the download is taking too long + return outpath + except: + return "" + + +def plot_raster(raster): + fig, ax = pyplot.subplots(1, figsize=(12, 12)) + + with MemoryFile(BytesIO(raster)) as memfile: + with memfile.open() as src: + show(src.read(1), ax=ax, cmap='pink') + pyplot.show() + +def rasterio_lambda(raster, lambda_f): + @udf("double") + def f_udf(f_raster): + with MemoryFile(BytesIO(f_raster)) as memfile: + with memfile.open() as dataset: + x = lambda_f(dataset) + return float(x) + + return f_udf(raster) \ No newline at end of file diff --git a/python/mosaic/api/aggregators.py b/python/mosaic/api/aggregators.py index 930352f92..ad4425e30 100644 --- a/python/mosaic/api/aggregators.py +++ b/python/mosaic/api/aggregators.py @@ -8,7 +8,14 @@ # Spatial aggregators # ####################### -__all__ = ["st_intersection_aggregate", "st_intersects_aggregate", "st_union_agg", "grid_cell_union_agg", "grid_cell_intersection_agg"] +__all__ = [ + "st_intersection_aggregate", + "st_intersects_aggregate", + "st_union_agg", + "grid_cell_union_agg", + "grid_cell_intersection_agg", + "rst_merge_agg" +] def st_intersection_aggregate( @@ -112,4 +119,22 @@ def grid_cell_union_agg(chips: ColumnOrName) -> Column: """ return config.mosaic_context.invoke_function( "grid_cell_union_agg", pyspark_to_java_column(chips) + ) + + +def rst_merge_agg(raster: ColumnOrName) -> Column: + """ + Returns the raster representing the aggregated union of rasters on some grid cell. + + Parameters + ---------- + raster: Column + + Returns + ------- + Column + The union raster. + """ + return config.mosaic_context.invoke_function( + "rst_merge_agg", pyspark_to_java_column(raster) ) \ No newline at end of file diff --git a/python/mosaic/api/raster.py b/python/mosaic/api/raster.py index 7b17e48a3..d7414438c 100644 --- a/python/mosaic/api/raster.py +++ b/python/mosaic/api/raster.py @@ -15,6 +15,7 @@ "rst_isempty", "rst_memsize", "rst_metadata", + "rst_merge", "rst_numbands", "rst_pixelheight", "rst_pixelwidth", @@ -36,6 +37,7 @@ "rst_srid", "rst_subdatasets", "rst_summary", + "rst_subdivide", "rst_tessellate", "rst_upperleftx", "rst_upperlefty", @@ -69,6 +71,7 @@ def rst_bandmetadata(raster: ColumnOrName, band: ColumnOrName) -> Column: pyspark_to_java_column(band) ) + def rst_georeference(raster: ColumnOrName) -> Column: """ Returns GeoTransform of the raster as a GT array of doubles. @@ -96,6 +99,7 @@ def rst_georeference(raster: ColumnOrName) -> Column: pyspark_to_java_column(raster) ) + def rst_height(raster: ColumnOrName) -> Column: """ Parameters @@ -114,6 +118,7 @@ def rst_height(raster: ColumnOrName) -> Column: pyspark_to_java_column(raster) ) + def rst_isempty(raster: ColumnOrName) -> Column: """ Parameters @@ -132,6 +137,7 @@ def rst_isempty(raster: ColumnOrName) -> Column: pyspark_to_java_column(raster) ) + def rst_memsize(raster: ColumnOrName) -> Column: """ Parameters @@ -150,6 +156,7 @@ def rst_memsize(raster: ColumnOrName) -> Column: pyspark_to_java_column(raster) ) + def rst_metadata(raster: ColumnOrName) -> Column: """ Parameters @@ -168,6 +175,30 @@ def rst_metadata(raster: ColumnOrName) -> Column: pyspark_to_java_column(raster) ) + +def rst_merge(rasters: ColumnOrName) -> Column: + """ + Merges the rasters into a single raster. + The result is the path to the merged raster. + The result is stored in the checkpoint directory. + + Parameters + ---------- + rasters : Column (ArrayType(StringType)) + Paths to the rasters to merge. + + Returns + ------- + Column (StringType) + The path to the merged raster. + + """ + return config.mosaic_context.invoke_function( + "rst_merge", + pyspark_to_java_column(rasters) + ) + + def rst_numbands(raster: ColumnOrName) -> Column: """ Parameters @@ -186,6 +217,7 @@ def rst_numbands(raster: ColumnOrName) -> Column: pyspark_to_java_column(raster) ) + def rst_pixelheight(raster: ColumnOrName) -> Column: """ Parameters @@ -204,6 +236,7 @@ def rst_pixelheight(raster: ColumnOrName) -> Column: pyspark_to_java_column(raster) ) + def rst_pixelwidth(raster: ColumnOrName) -> Column: """ Parameters @@ -222,6 +255,7 @@ def rst_pixelwidth(raster: ColumnOrName) -> Column: pyspark_to_java_column(raster) ) + def rst_rastertogridavg(raster: ColumnOrName, resolution: ColumnOrName) -> Column: """ The result is a 2D array of cells, where each cell is a struct of (cellID, value). @@ -246,6 +280,7 @@ def rst_rastertogridavg(raster: ColumnOrName, resolution: ColumnOrName) -> Colum pyspark_to_java_column(resolution) ) + def rst_rastertogridcount(raster: ColumnOrName, resolution: ColumnOrName) -> Column: """ The result is a 2D array of cells, where each cell is a struct of (cellID, value). @@ -270,6 +305,7 @@ def rst_rastertogridcount(raster: ColumnOrName, resolution: ColumnOrName) -> Col pyspark_to_java_column(resolution) ) + def rst_rastertogridmax(raster: ColumnOrName, resolution: ColumnOrName) -> Column: """ The result is a 2D array of cells, where each cell is a struct of (cellID, value). @@ -294,6 +330,7 @@ def rst_rastertogridmax(raster: ColumnOrName, resolution: ColumnOrName) -> Colum pyspark_to_java_column(resolution) ) + def rst_rastertogridmedian(raster: ColumnOrName, resolution: ColumnOrName) -> Column: """ The result is a 2D array of cells, where each cell is a struct of (cellID, value). @@ -318,6 +355,7 @@ def rst_rastertogridmedian(raster: ColumnOrName, resolution: ColumnOrName) -> Co pyspark_to_java_column(resolution) ) + def rst_rastertogridmin(raster: ColumnOrName, resolution: ColumnOrName) -> Column: """ The result is a 2D array of cells, where each cell is a struct of (cellID, value). @@ -342,6 +380,7 @@ def rst_rastertogridmin(raster: ColumnOrName, resolution: ColumnOrName) -> Colum pyspark_to_java_column(resolution) ) + def rst_rastertoworldcoord(raster: ColumnOrName, x: ColumnOrName, y: ColumnOrName) -> Column: """ Computes the world coordinates of the raster pixel at the given x and y coordinates. @@ -366,6 +405,7 @@ def rst_rastertoworldcoord(raster: ColumnOrName, x: ColumnOrName, y: ColumnOrNam pyspark_to_java_column(y) ) + def rst_rastertoworldcoordx(raster: ColumnOrName, x: ColumnOrName, y: ColumnOrName) -> Column: """ Computes the world coordinates of the raster pixel at the given x and y coordinates. @@ -389,6 +429,7 @@ def rst_rastertoworldcoordx(raster: ColumnOrName, x: ColumnOrName, y: ColumnOrNa pyspark_to_java_column(y) ) + def rst_rastertoworldcoordy(raster: ColumnOrName, x: ColumnOrName, y: ColumnOrName) -> Column: """ Computes the world coordinates of the raster pixel at the given x and y coordinates. @@ -412,6 +453,7 @@ def rst_rastertoworldcoordy(raster: ColumnOrName, x: ColumnOrName, y: ColumnOrNa pyspark_to_java_column(y) ) + def rst_retile(raster: ColumnOrName, tileWidth: ColumnOrName, tileHeight: ColumnOrName) -> Column: """ Retiles the raster to the given tile size. The result is a collection of new raster files. @@ -437,6 +479,7 @@ def rst_retile(raster: ColumnOrName, tileWidth: ColumnOrName, tileHeight: Column pyspark_to_java_column(tileHeight) ) + def rst_gridtiles(raster: ColumnOrName, resolution: ColumnOrName) -> Column: return config.mosaic_context.invoke_function( "rst_gridtiles", @@ -444,6 +487,7 @@ def rst_gridtiles(raster: ColumnOrName, resolution: ColumnOrName) -> Column: pyspark_to_java_column(resolution) ) + def rst_rotation(raster: ColumnOrName) -> Column: """ Computes the rotation of the raster in degrees. @@ -466,6 +510,7 @@ def rst_rotation(raster: ColumnOrName) -> Column: pyspark_to_java_column(raster) ) + def rst_scalex(raster: ColumnOrName) -> Column: """ Computes the scale of the raster in the X direction. @@ -486,6 +531,7 @@ def rst_scalex(raster: ColumnOrName) -> Column: pyspark_to_java_column(raster) ) + def rst_scaley(raster: ColumnOrName) -> Column: """ Computes the scale of the raster in the Y direction. @@ -506,6 +552,7 @@ def rst_scaley(raster: ColumnOrName) -> Column: pyspark_to_java_column(raster) ) + def rst_skewx(raster: ColumnOrName) -> Column: """ Computes the skew of the raster in the X direction. @@ -526,6 +573,7 @@ def rst_skewx(raster: ColumnOrName) -> Column: pyspark_to_java_column(raster) ) + def rst_skewy(raster: ColumnOrName) -> Column: """ Computes the skew of the raster in the Y direction. @@ -546,6 +594,7 @@ def rst_skewy(raster: ColumnOrName) -> Column: pyspark_to_java_column(raster) ) + def rst_srid(raster: ColumnOrName) -> Column: """ Computes the SRID of the raster. @@ -567,6 +616,7 @@ def rst_srid(raster: ColumnOrName) -> Column: pyspark_to_java_column(raster) ) + def rst_subdatasets(raster: ColumnOrName) -> Column: """ Computes the subdatasets of the raster. @@ -589,6 +639,7 @@ def rst_subdatasets(raster: ColumnOrName) -> Column: pyspark_to_java_column(raster) ) + def rst_summary(raster: ColumnOrName) -> Column: """ Computes the summary of the raster. @@ -612,12 +663,10 @@ def rst_summary(raster: ColumnOrName) -> Column: pyspark_to_java_column(raster) ) + def rst_tessellate(raster: ColumnOrName, resolution: ColumnOrName) -> Column: """ - Computes the summary of the raster. - The summary is a map of the statistics of the raster. - The logic is produced by gdalinfo procedure. - The result is stored as JSON. + Parameters ---------- @@ -638,6 +687,30 @@ def rst_tessellate(raster: ColumnOrName, resolution: ColumnOrName) -> Column: pyspark_to_java_column(resolution) ) + +def rst_subdivide(raster: ColumnOrName, size_in_mb: ColumnOrName) -> Column: + """ + + Parameters + ---------- + raster : Column (StringType) + Path to the raster file. + size_in_mb : Column (IntegerType) + The size of the tiles in MB. + + Returns + ------- + Column (RasterTiles) + A collection of tiles of the raster. + + """ + return config.mosaic_context.invoke_function( + "rst_subdivide", + pyspark_to_java_column(raster), + pyspark_to_java_column(size_in_mb) + ) + + def rst_upperleftx(raster: ColumnOrName) -> Column: """ Computes the upper left X coordinate of the raster. @@ -659,6 +732,7 @@ def rst_upperleftx(raster: ColumnOrName) -> Column: pyspark_to_java_column(raster) ) + def rst_upperlefty(raster: ColumnOrName) -> Column: """ Computes the upper left Y coordinate of the raster. @@ -676,10 +750,11 @@ def rst_upperlefty(raster: ColumnOrName) -> Column: """ return config.mosaic_context.invoke_function( - "rst_upperlefty", - pyspark_to_java_column(raster) + "rst_upperlefty", + pyspark_to_java_column(raster) ) + def rst_width(raster: ColumnOrName) -> Column: """ Computes the width of the raster in pixels. @@ -696,10 +771,11 @@ def rst_width(raster: ColumnOrName) -> Column: """ return config.mosaic_context.invoke_function( - "rst_width", - pyspark_to_java_column(raster) + "rst_width", + pyspark_to_java_column(raster) ) + def rst_worldtorastercoord(raster: ColumnOrName, x: ColumnOrName, y: ColumnOrName) -> Column: """ Computes the raster coordinates of the world coordinates. @@ -719,10 +795,11 @@ def rst_worldtorastercoord(raster: ColumnOrName, x: ColumnOrName, y: ColumnOrNam """ return config.mosaic_context.invoke_function( - "rst_worldtorastercoord", - pyspark_to_java_column(raster) + "rst_worldtorastercoord", + pyspark_to_java_column(raster) ) + def rst_worldtorastercoordx(raster: ColumnOrName, x: ColumnOrName, y: ColumnOrName) -> Column: """ Computes the raster coordinates of the world coordinates. @@ -743,10 +820,11 @@ def rst_worldtorastercoordx(raster: ColumnOrName, x: ColumnOrName, y: ColumnOrNa """ return config.mosaic_context.invoke_function( - "rst_worldtorastercoordx", - pyspark_to_java_column(raster) + "rst_worldtorastercoordx", + pyspark_to_java_column(raster) ) + def rst_worldtorastercoordy(raster: ColumnOrName, x: ColumnOrName, y: ColumnOrName) -> Column: """ Computes the raster coordinates of the world coordinates. @@ -767,6 +845,6 @@ def rst_worldtorastercoordy(raster: ColumnOrName, x: ColumnOrName, y: ColumnOrNa """ return config.mosaic_context.invoke_function( - "rst_worldtorastercoordy", - pyspark_to_java_column(raster) - ) \ No newline at end of file + "rst_worldtorastercoordy", + pyspark_to_java_column(raster) + ) diff --git a/src/main/resources/gdal/ubuntu/lib/jni/libgdalalljni.so b/src/main/resources/gdal/ubuntu/lib/jni/libgdalalljni.so new file mode 100644 index 000000000..2bd18f5f0 Binary files /dev/null and b/src/main/resources/gdal/ubuntu/lib/jni/libgdalalljni.so differ diff --git a/src/main/resources/gdal/ubuntu/lib/jni/libgdalalljni.so.30 b/src/main/resources/gdal/ubuntu/lib/jni/libgdalalljni.so.30 new file mode 100644 index 000000000..2bd18f5f0 Binary files /dev/null and b/src/main/resources/gdal/ubuntu/lib/jni/libgdalalljni.so.30 differ diff --git a/src/main/resources/gdal/ubuntu/lib/ogdi/libgdal.so b/src/main/resources/gdal/ubuntu/lib/ogdi/libgdal.so new file mode 100644 index 000000000..014d78399 Binary files /dev/null and b/src/main/resources/gdal/ubuntu/lib/ogdi/libgdal.so differ diff --git a/src/main/resources/gdal/ubuntu/usr/lib/libgdal.so b/src/main/resources/gdal/ubuntu/usr/lib/libgdal.so new file mode 100644 index 000000000..4a28d7c70 Binary files /dev/null and b/src/main/resources/gdal/ubuntu/usr/lib/libgdal.so differ diff --git a/src/main/resources/gdal/ubuntu/usr/lib/libgdal.so.30 b/src/main/resources/gdal/ubuntu/usr/lib/libgdal.so.30 new file mode 100644 index 000000000..4a28d7c70 Binary files /dev/null and b/src/main/resources/gdal/ubuntu/usr/lib/libgdal.so.30 differ diff --git a/src/main/resources/gdal/ubuntu/usr/lib/libgdal.so.30.0.3 b/src/main/resources/gdal/ubuntu/usr/lib/libgdal.so.30.0.3 new file mode 100644 index 000000000..4a28d7c70 Binary files /dev/null and b/src/main/resources/gdal/ubuntu/usr/lib/libgdal.so.30.0.3 differ diff --git a/src/main/resources/scripts/apt_setup_docker.sh b/src/main/resources/scripts/apt_setup_docker.sh new file mode 100644 index 000000000..6a19f6362 --- /dev/null +++ b/src/main/resources/scripts/apt_setup_docker.sh @@ -0,0 +1,5 @@ +#!/bin/bash + +apt-get update +apt-get install sudo +sudo apt-get install software-properties-common \ No newline at end of file diff --git a/src/main/resources/scripts/install-gdal-databricks.sh b/src/main/resources/scripts/install-gdal-databricks.sh index d3e249f4c..f73478144 100644 --- a/src/main/resources/scripts/install-gdal-databricks.sh +++ b/src/main/resources/scripts/install-gdal-databricks.sh @@ -5,8 +5,6 @@ # Created: 2022-08-19 # -MOSAIC_GDAL_JNI_DIR="${MOSAIC_GDAL_JNI_DIR:-__DEFAULT_JNI_PATH__}" - sudo rm -r /var/lib/apt/lists/* sudo add-apt-repository main sudo add-apt-repository universe @@ -24,10 +22,4 @@ cd /usr/lib/python3/dist-packages/osgeo \ && mv _gdalconst.cpython-38-x86_64-linux-gnu.so _gdalconst.so \ && mv _ogr.cpython-38-x86_64-linux-gnu.so _ogr.so \ && mv _gnm.cpython-38-x86_64-linux-gnu.so _gnm.so \ - && mv _osr.cpython-38-x86_64-linux-gnu.so _osr.so - -# add pre-build JNI shared object to the path -# please run MosaicGDAL.copySharedObjects("/dbfs/FileStore/geospatial/mosaic/gdal/") before enabling this init script -mkdir -p /usr/lib/jni -cp "${MOSAIC_GDAL_JNI_DIR}/libgdalalljni.so" /usr/lib/jni -cp "${MOSAIC_GDAL_JNI_DIR}/libgdalalljni.so.30" /usr/lib/jni \ No newline at end of file + && mv _osr.cpython-38-x86_64-linux-gnu.so _osr.so \ No newline at end of file diff --git a/src/main/scala/com/databricks/labs/mosaic/core/raster/gdal_raster/MosaicRasterGDAL.scala b/src/main/scala/com/databricks/labs/mosaic/core/raster/gdal_raster/MosaicRasterGDAL.scala index adc017e1a..8381acb33 100644 --- a/src/main/scala/com/databricks/labs/mosaic/core/raster/gdal_raster/MosaicRasterGDAL.scala +++ b/src/main/scala/com/databricks/labs/mosaic/core/raster/gdal_raster/MosaicRasterGDAL.scala @@ -144,7 +144,11 @@ class MosaicRasterGDAL(_uuid: Long, var raster: Dataset, path: String) extends M val gdalInfo = GDALInfo(raster, infoOptions) val json = parse(gdalInfo).extract[Map[String, Any]] - json("STATISTICS_VALID_PERCENT").asInstanceOf[Double] == 0.0 + if (json.contains("STATISTICS_VALID_PERCENT")) { + json("STATISTICS_VALID_PERCENT").asInstanceOf[Double] == 0.0 + } else { + false + } } override def getPath: String = path @@ -308,9 +312,11 @@ object MosaicRasterGDAL extends RasterReader { * A MosaicRaster object. */ override def readRaster(inPath: String): MosaicRaster = { - val path = PathUtils.getCleanPath(inPath, inPath.endsWith(".zip")) + val isSubdataset = PathUtils.isSubdataset(inPath) + val localCopy = PathUtils.copyToTmp(inPath) + val path = PathUtils.getCleanPath(localCopy, localCopy.endsWith(".zip")) + val uuid = Murmur3.hash64(path.getBytes()) - val isSubdataset = PathUtils.isSubdataset(path) val readPath = if (isSubdataset) PathUtils.getSubdatasetPath(path) else PathUtils.getZipPath(path) diff --git a/src/main/scala/com/databricks/labs/mosaic/core/raster/operator/clip/RasterClipByVector.scala b/src/main/scala/com/databricks/labs/mosaic/core/raster/operator/clip/RasterClipByVector.scala index a8cec08a8..57ca46f92 100644 --- a/src/main/scala/com/databricks/labs/mosaic/core/raster/operator/clip/RasterClipByVector.scala +++ b/src/main/scala/com/databricks/labs/mosaic/core/raster/operator/clip/RasterClipByVector.scala @@ -14,8 +14,6 @@ object RasterClipByVector { val rasterCRS = raster.getRaster.GetSpatialRef() val outShortName = raster.getRaster.GetDriver().getShortName - val uuid = java.util.UUID.randomUUID() - //val resultFileName = s"/vsimem/${raster.uuid}_${uuid.toString}.${raster.getExtension}" val resultFileName = PathUtils.createTmpFilePath(raster.uuid.toString, raster.getExtension) val shapeFileName = VectorClipper.generateClipper(geometry, geomCRS, rasterCRS, geometryAPI) @@ -23,7 +21,7 @@ object RasterClipByVector { val result = GDALWarp.executeWarp( resultFileName, Seq(raster), - command = s"gdalwarp -of $outShortName -cutline $shapeFileName -crop_to_cutline -multi -wm 500 -wo NUM_THREADS=ALL_CPUS -co NUM_THREADS=ALL_CPUS" + command = s"gdalwarp -of $outShortName -cutline $shapeFileName -crop_to_cutline" ) gdal.Unlink(shapeFileName) diff --git a/src/main/scala/com/databricks/labs/mosaic/core/raster/operator/merge/MergeRasters.scala b/src/main/scala/com/databricks/labs/mosaic/core/raster/operator/merge/MergeRasters.scala index 97da78b26..d9148ed7b 100644 --- a/src/main/scala/com/databricks/labs/mosaic/core/raster/operator/merge/MergeRasters.scala +++ b/src/main/scala/com/databricks/labs/mosaic/core/raster/operator/merge/MergeRasters.scala @@ -2,7 +2,7 @@ package com.databricks.labs.mosaic.core.raster.operator.merge import com.databricks.labs.mosaic.core.raster.MosaicRaster import com.databricks.labs.mosaic.core.raster.gdal_raster.RasterCleaner -import com.databricks.labs.mosaic.core.raster.operator.gdal.{GDALBuildVRT, GDALTranslate} +import com.databricks.labs.mosaic.core.raster.operator.gdal.{GDALBuildVRT, GDALTranslate, GDALWarp} import com.databricks.labs.mosaic.utils.PathUtils object MergeRasters { @@ -17,17 +17,23 @@ object MergeRasters { val vrtPath = PathUtils.createTmpFilePath(vrtUUID, "vrt") val rasterPath = PathUtils.createTmpFilePath(rasterUUID, "tif") - val vrtRaster = GDALBuildVRT.executeVRT(vrtPath, rasters, command = s"gdalbuildvrt -r average -resolution highest -overwrite") +// val vrtRaster = GDALBuildVRT.executeVRT(vrtPath, rasters, command = s"gdalbuildvrt -r average -resolution highest -overwrite") +// +// // We use PACKBITS compression based on benchmarking done here: https://digital-geography.com/geotiff-compression-comparison/ +// // PACKBITS is losseless compression and is the fastest compression algorithm for GeoTIFFs, but it does not compress as well as DEFLATE. +// // https://en.wikipedia.org/wiki/PackBits +// val result = +// GDALTranslate.executeTranslate(rasterPath, vrtRaster, command = s"gdal_translate -of $outShortName -co COMPRESS=PACKBITS") - // We use PACKBITS compression based on benchmarking done here: https://digital-geography.com/geotiff-compression-comparison/ - // PACKBITS is losseless compression and is the fastest compression algorithm for GeoTIFFs, but it does not compress as well as DEFLATE. - // https://en.wikipedia.org/wiki/PackBits - val result = - GDALTranslate.executeTranslate(rasterPath, vrtRaster, command = s"gdal_translate -of $outShortName -co COMPRESS=PACKBITS") + val result2 = GDALWarp.executeWarp( + rasterPath, + rasters, + command = s"gdalwarp -r bilinear -of $outShortName -co COMPRESS=PACKBITS -overwrite" + ) - RasterCleaner.dispose(vrtRaster) +// RasterCleaner.dispose(vrtRaster) - result + result2 } } diff --git a/src/main/scala/com/databricks/labs/mosaic/core/raster/operator/retile/RasterTessellate.scala b/src/main/scala/com/databricks/labs/mosaic/core/raster/operator/retile/RasterTessellate.scala index c8ceb1516..95fbc7af3 100644 --- a/src/main/scala/com/databricks/labs/mosaic/core/raster/operator/retile/RasterTessellate.scala +++ b/src/main/scala/com/databricks/labs/mosaic/core/raster/operator/retile/RasterTessellate.scala @@ -17,14 +17,13 @@ object RasterTessellate { val cellID = cell.cellIdAsLong(indexSystem) val cellRaster = raster.getRasterForCell(cellID, indexSystem, geometryAPI) cellRaster.getRaster.FlushCache() - if (cell.isCore) (true, MosaicRasterChip(cell.index, cellRaster)) - else { - ( - // If the cell is not core, we check if it has any data, if it doesn't we don't return it - cellRaster.getBands.exists { band => band.values.count(_ != band.noDataValue) + band.maskValues.count(_ != 0) != 0 }, - MosaicRasterChip(cell.index, cellRaster) - ) - } + ( + cellRaster.getBands.exists { band => + band.values.count(_ != band.noDataValue) > 0 && + band.maskValues.count(_ > 0) > 0 + } && !cellRaster.isEmpty, + MosaicRasterChip(cell.index, cellRaster) + ) }) .filter(_._1) .map(_._2) diff --git a/src/main/scala/com/databricks/labs/mosaic/expressions/raster/RST_Merge.scala b/src/main/scala/com/databricks/labs/mosaic/expressions/raster/RST_Merge.scala index fdc487204..94404b890 100644 --- a/src/main/scala/com/databricks/labs/mosaic/expressions/raster/RST_Merge.scala +++ b/src/main/scala/com/databricks/labs/mosaic/expressions/raster/RST_Merge.scala @@ -9,7 +9,7 @@ import com.databricks.labs.mosaic.functions.MosaicExpressionConfig import org.apache.spark.sql.catalyst.analysis.FunctionRegistry.FunctionBuilder import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback import org.apache.spark.sql.catalyst.expressions.{Expression, NullIntolerant} -import org.apache.spark.sql.types.ArrayType +import org.apache.spark.sql.types.BinaryType /** * Returns a set of new rasters with the specified tile size (tileWidth x @@ -20,8 +20,8 @@ case class RST_Merge( expressionConfig: MosaicExpressionConfig ) extends RasterArrayExpression[RST_Merge]( rastersExpr, - null, - returnsRaster = true, + BinaryType, + returnsRaster = false, expressionConfig = expressionConfig ) with NullIntolerant @@ -34,7 +34,7 @@ case class RST_Merge( override def rasterTransform(rasters: Seq[MosaicRaster]): Any = { val result = MergeRasters.merge(rasters) rasters.foreach(RasterCleaner.dispose) - result + rasterAPI.writeRasters(Seq(result), expressionConfig.getRasterCheckpoint, BinaryType).head } } diff --git a/src/main/scala/com/databricks/labs/mosaic/expressions/raster/RST_MergeAgg.scala b/src/main/scala/com/databricks/labs/mosaic/expressions/raster/RST_MergeAgg.scala index 577248271..452e3fdce 100644 --- a/src/main/scala/com/databricks/labs/mosaic/expressions/raster/RST_MergeAgg.scala +++ b/src/main/scala/com/databricks/labs/mosaic/expressions/raster/RST_MergeAgg.scala @@ -9,7 +9,7 @@ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.aggregate.{ImperativeAggregate, TypedImperativeAggregate} import org.apache.spark.sql.catalyst.expressions.{Expression, ExpressionInfo} import org.apache.spark.sql.catalyst.trees.UnaryLike -import org.apache.spark.sql.types.{BinaryType, DataType, StringType} +import org.apache.spark.sql.types.{BinaryType, DataType} /** * Returns a set of new rasters with the specified tile size (tileWidth x @@ -46,13 +46,13 @@ case class RST_MergeAgg( val newRaster = rasterAPI.readRaster(rasterExpr.eval(inputRow), rasterExpr.dataType) val mergedRaster = MergeRasters.merge(Seq(partialRaster, newRaster)) - val newState = serialize(mergedRaster, returnsRaster = true, BinaryType, rasterAPI, expressionConfig) + val newState = mergedRaster.writeToBytes() RasterCleaner.dispose(partialRaster) RasterCleaner.dispose(newRaster) RasterCleaner.dispose(mergedRaster) - newState.asInstanceOf[Array[Byte]] + newState } } @@ -65,13 +65,13 @@ case class RST_MergeAgg( val leftPartial = rasterAPI.readRaster(accumulator, BinaryType) val rightPartial = rasterAPI.readRaster(input, BinaryType) val mergedRaster = MergeRasters.merge(Seq(leftPartial, rightPartial)) - val newState = serialize(mergedRaster, returnsRaster = true, BinaryType, rasterAPI, expressionConfig) + val newState = mergedRaster.writeToBytes() RasterCleaner.dispose(leftPartial) RasterCleaner.dispose(rightPartial) RasterCleaner.dispose(mergedRaster) - newState.asInstanceOf[Array[Byte]] + newState } } diff --git a/src/main/scala/com/databricks/labs/mosaic/expressions/raster/RST_Subdivide.scala b/src/main/scala/com/databricks/labs/mosaic/expressions/raster/RST_Subdivide.scala new file mode 100644 index 000000000..1c429db1a --- /dev/null +++ b/src/main/scala/com/databricks/labs/mosaic/expressions/raster/RST_Subdivide.scala @@ -0,0 +1,69 @@ +package com.databricks.labs.mosaic.expressions.raster + +import com.databricks.labs.mosaic.core.raster.MosaicRaster +import com.databricks.labs.mosaic.core.raster.gdal_raster.RasterCleaner +import com.databricks.labs.mosaic.core.raster.operator.retile.BalancedSubdivision +import com.databricks.labs.mosaic.expressions.base.{GenericExpressionFactory, WithExpressionInfo} +import com.databricks.labs.mosaic.expressions.raster.base.RasterGeneratorExpression +import com.databricks.labs.mosaic.functions.MosaicExpressionConfig +import org.apache.spark.sql.catalyst.analysis.FunctionRegistry.FunctionBuilder +import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback +import org.apache.spark.sql.catalyst.expressions.{Expression, NullIntolerant} +import org.apache.spark.sql.types.{BinaryType, DataType} + +/** + * Returns a set of new rasters with the specified tile size (tileWidth x + * tileHeight). + */ +case class RST_Subdivide( + rasterExpr: Expression, + sizeInMB: Expression, + expressionConfig: MosaicExpressionConfig +) extends RasterGeneratorExpression[RST_Subdivide](rasterExpr, expressionConfig) + with NullIntolerant + with CodegenFallback { + + override def dataType: DataType = BinaryType + + /** + * Returns a set of new rasters with the specified tile size (tileWidth x + * tileHeight). + */ + override def rasterGenerator(raster: MosaicRaster): Seq[MosaicRaster] = { + val targetSize = sizeInMB.eval().asInstanceOf[Int] * 1024 * 1024 + val size = raster.getMemSize + val numSplits = Math.ceil(size / targetSize).toInt + val tiles = BalancedSubdivision.splitRaster(raster, numSplits) + RasterCleaner.dispose(raster) + tiles + } + + override def children: Seq[Expression] = Seq(rasterExpr, sizeInMB) + +} + +/** Expression info required for the expression registration for spark SQL. */ +object RST_Subdivide extends WithExpressionInfo { + + override def name: String = "rst_subdivide" + + override def usage: String = + """ + |_FUNC_(expr1) - Returns a set of new rasters with same aspect ratio that are not larger than the threshold memory footprint. + |""".stripMargin + + override def example: String = + """ + | Examples: + | > SELECT _FUNC_(a, b); + | /path/to/raster_tile_1.tif + | /path/to/raster_tile_2.tif + | /path/to/raster_tile_3.tif + | ... + | """.stripMargin + + override def builder(expressionConfig: MosaicExpressionConfig): FunctionBuilder = { + GenericExpressionFactory.getBaseBuilder[RST_Subdivide](3, expressionConfig) + } + +} diff --git a/src/main/scala/com/databricks/labs/mosaic/expressions/raster/base/RasterGeneratorExpression.scala b/src/main/scala/com/databricks/labs/mosaic/expressions/raster/base/RasterGeneratorExpression.scala index 20728d5fa..ffa1f2f51 100644 --- a/src/main/scala/com/databricks/labs/mosaic/expressions/raster/base/RasterGeneratorExpression.scala +++ b/src/main/scala/com/databricks/labs/mosaic/expressions/raster/base/RasterGeneratorExpression.scala @@ -36,6 +36,8 @@ abstract class RasterGeneratorExpression[T <: Expression: ClassTag]( with NullIntolerant with Serializable { + override def dataType: DataType = rasterExpr.dataType + val uuid: String = java.util.UUID.randomUUID().toString.replace("-", "_") /** @@ -55,7 +57,7 @@ abstract class RasterGeneratorExpression[T <: Expression: ClassTag]( * needs to be wrapped in a StructType. The actually type is that of the * structs element. */ - override def elementSchema: StructType = StructType(Array(StructField("raster", rasterExpr.dataType))) + override def elementSchema: StructType = StructType(Array(StructField("raster", dataType))) /** * The function to be overridden by the extending class. It is called when @@ -74,7 +76,7 @@ abstract class RasterGeneratorExpression[T <: Expression: ClassTag]( val inRaster = rasterAPI.readRaster(rasterExpr.eval(input), rasterExpr.dataType) val generatedRasters = rasterGenerator(inRaster) - val rows = rasterAPI.writeRasters(generatedRasters, checkpointPath, rasterExpr.dataType) + val rows = rasterAPI.writeRasters(generatedRasters, checkpointPath, dataType) RasterCleaner.dispose(inRaster) generatedRasters.foreach(RasterCleaner.dispose) diff --git a/src/main/scala/com/databricks/labs/mosaic/functions/MosaicContext.scala b/src/main/scala/com/databricks/labs/mosaic/functions/MosaicContext.scala index 049625556..0c0315a09 100644 --- a/src/main/scala/com/databricks/labs/mosaic/functions/MosaicContext.scala +++ b/src/main/scala/com/databricks/labs/mosaic/functions/MosaicContext.scala @@ -285,6 +285,7 @@ class MosaicContext(indexSystem: IndexSystem, geometryAPI: GeometryAPI, rasterAP mosaicRegistry.registerExpression[RST_Subdatasets](expressionConfig) mosaicRegistry.registerExpression[RST_Summary](expressionConfig) mosaicRegistry.registerExpression[RST_Tessellate](expressionConfig) + mosaicRegistry.registerExpression[RST_Subdivide](expressionConfig) mosaicRegistry.registerExpression[RST_UpperLeftX](expressionConfig) mosaicRegistry.registerExpression[RST_UpperLeftY](expressionConfig) mosaicRegistry.registerExpression[RST_Width](expressionConfig) @@ -687,6 +688,8 @@ class MosaicContext(indexSystem: IndexSystem, geometryAPI: GeometryAPI, rasterAP def rst_tessellate(raster: Column, resolution: Column): Column = ColumnAdapter(RST_Tessellate(raster.expr, resolution.expr, expressionConfig)) def rst_tessellate(raster: String, resolution: Column): Column = ColumnAdapter(RST_Tessellate(col(raster).expr, resolution.expr, expressionConfig)) def rst_tessellate(raster: Column, resolution: Int): Column = ColumnAdapter(RST_Tessellate(raster.expr, lit(resolution).expr, expressionConfig)) + def rst_subdivide(raster: Column, sizeInMB: Column): Column = ColumnAdapter(RST_Subdivide(raster.expr, sizeInMB.expr, expressionConfig)) + def rst_subdivide(raster: Column, sizeInMB: Int): Column = ColumnAdapter(RST_Subdivide(raster.expr, lit(sizeInMB).expr, expressionConfig)) def rst_upperleftx(raster: Column): Column = ColumnAdapter(RST_UpperLeftX(raster.expr, expressionConfig)) def rst_upperleftx(raster: String): Column = ColumnAdapter(RST_UpperLeftX(lit(raster).expr, expressionConfig)) def rst_upperlefty(raster: Column): Column = ColumnAdapter(RST_UpperLeftY(raster.expr, expressionConfig)) diff --git a/src/main/scala/com/databricks/labs/mosaic/gdal/MosaicGDAL.scala b/src/main/scala/com/databricks/labs/mosaic/gdal/MosaicGDAL.scala index 5cbc84bc1..cac869a6a 100644 --- a/src/main/scala/com/databricks/labs/mosaic/gdal/MosaicGDAL.scala +++ b/src/main/scala/com/databricks/labs/mosaic/gdal/MosaicGDAL.scala @@ -7,12 +7,18 @@ import org.gdal.gdal.gdal import java.io.{BufferedInputStream, File, PrintWriter} import java.nio.file.{Files, Paths} import scala.language.postfixOps -import scala.sys.process._ import scala.util.Try //noinspection DuplicatedCode object MosaicGDAL extends Logging { + private val usrlibsoPath = "/usr/lib/libgdal.so" + private val usrlibso30Path = "/usr/lib/libgdal.so.30" + private val usrlibso3003Path = "/usr/lib/libgdal.so.30.0.3" + private val libjnisoPath = "/lib/jni/libgdalalljni.so" + private val libjniso30Path = "/lib/jni/libgdalalljni.so.30" + private val libogdisoPath = "/lib/ogdi/libgdal.so" + // noinspection ScalaWeakerAccess val GDAL_ENABLED = "spark.mosaic.gdal.native.enabled" private val mosaicGDALPath = Files.createTempDirectory("mosaic-gdal") @@ -21,16 +27,14 @@ object MosaicGDAL extends Logging { def wasEnabled(spark: SparkSession): Boolean = spark.conf.get(GDAL_ENABLED, "false").toBoolean - def prepareEnvironment(spark: SparkSession, initScriptPath: String, sharedObjectsPath: String): Unit = { + def prepareEnvironment(spark: SparkSession, initScriptPath: String): Unit = { if (!wasEnabled(spark) && !isEnabled) { Try { copyInitScript(initScriptPath) - copySharedObjects(sharedObjectsPath) + copySharedObjects() } match { case scala.util.Success(_) => logInfo("GDAL environment prepared successfully.") - case scala.util.Failure(exception) => - logError("GDAL environment preparation failed.", exception) - throw exception + case scala.util.Failure(exception) => logWarning("GDAL environment preparation failed.", exception) } } } @@ -39,6 +43,7 @@ object MosaicGDAL extends Logging { if (!wasEnabled(spark) && !isEnabled) { Try { isEnabled = true + //copySharedObjects() loadSharedObjects() gdal.AllRegister() spark.conf.set(GDAL_ENABLED, "true") @@ -55,56 +60,66 @@ object MosaicGDAL extends Logging { } } - private def copySharedObjects(path: String): Unit = { - val so = readResourceBytes("/gdal/ubuntu/libgdalalljni.so") - val so30 = readResourceBytes("/gdal/ubuntu/libgdalalljni.so.30") + private def copySharedObjects(): Unit = { + + val libjniso = readResourceBytes(s"/gdal/ubuntu/$libjnisoPath") + val libjniso30 = readResourceBytes(s"/gdal/ubuntu/$libjniso30Path") + val libogdiso = readResourceBytes(s"/gdal/ubuntu/$libogdisoPath") + val usrlibso = readResourceBytes(s"/gdal/ubuntu/$usrlibsoPath") + val usrlibso30 = readResourceBytes(s"/gdal/ubuntu/$usrlibso30Path") + val usrlibso3003 = readResourceBytes(s"/gdal/ubuntu/$usrlibso3003Path") + + if (!Files.exists(Paths.get("/usr/lib"))) Files.createDirectories(Paths.get("/usr/lib")) + if (!Files.exists(Paths.get("/lib/jni"))) Files.createDirectories(Paths.get("/lib/jni")) + if (!Files.exists(Paths.get("/lib/ogdi"))) Files.createDirectories(Paths.get("/lib/ogdi")) - val usrGDALPath = Paths.get("/usr/lib/jni/") - if (!Files.exists(mosaicGDALPath)) Files.createDirectories(mosaicGDALPath) - if (!Files.exists(usrGDALPath)) Files.createDirectories(usrGDALPath) - Files.write(Paths.get(s"$mosaicGDALAbsolutePath/libgdalalljni.so"), so) - Files.write(Paths.get(s"$mosaicGDALAbsolutePath/libgdalalljni.so.30"), so30) + if (!Files.exists(Paths.get(usrlibsoPath))) Files.write(Paths.get(usrlibsoPath), usrlibso) + if (!Files.exists(Paths.get(usrlibso30Path))) Files.write(Paths.get(usrlibso30Path), usrlibso30) + if (!Files.exists(Paths.get(usrlibso3003Path))) Files.write(Paths.get(usrlibso3003Path), usrlibso3003) + if (!Files.exists(Paths.get(libjnisoPath))) Files.write(Paths.get(libogdisoPath), libjniso) + if (!Files.exists(Paths.get(libjniso30Path))) Files.write(Paths.get(libjniso30Path), libjniso30) + if (!Files.exists(Paths.get(libogdisoPath))) Files.write(Paths.get(libogdisoPath), libogdiso) - s"sudo cp $mosaicGDALAbsolutePath/libgdalalljni.so $path/libgdalalljni.so".!! - s"sudo cp $mosaicGDALAbsolutePath/libgdalalljni.so.30 $path/libgdalalljni.so.30".!! } // noinspection ScalaStyle private def copyInitScript(path: String): Unit = { val destPath = Paths.get(path) - if (!Files.exists(mosaicGDALPath)) Files.createDirectories(mosaicGDALPath) if (!Files.exists(destPath)) Files.createDirectories(destPath) - val w = new PrintWriter(new File(s"$mosaicGDALAbsolutePath/mosaic-gdal-init.sh")) + val w = new PrintWriter(new File(s"$path/mosaic-gdal-init.sh")) val scriptLines = readResourceLines("/scripts/install-gdal-databricks.sh") scriptLines .map { x => if (x.contains("__DEFAULT_JNI_PATH__")) x.replace("__DEFAULT_JNI_PATH__", path) else x } .foreach(x => w.println(x)) w.close() - - s"sudo cp $mosaicGDALAbsolutePath/mosaic-gdal-init.sh $path/mosaic-gdal-init.sh".!! } private def loadSharedObjects(): Unit = { + loadOrNOOP(usrlibsoPath) + loadOrNOOP(usrlibso30Path) + loadOrNOOP(usrlibso3003Path) + loadOrNOOP(libjnisoPath) + loadOrNOOP(libjniso30Path) + loadOrNOOP(libogdisoPath) + } + + private def loadOrNOOP(path: String): Unit = { try { - if (Files.exists(Paths.get("/usr/lib/libgdal.so.30"))) System.load("/usr/lib/libgdal.so.30") - if (!Files.exists(Paths.get("/usr/lib/libgdal.so"))) { - if (Files.exists(Paths.get("/usr/lib/libgdal.so.30"))) "sudo cp /usr/lib/libgdal.so.30 /usr/lib/libgdal.so".!! - } - if (Files.exists(Paths.get("/usr/lib/libgdal.so"))) System.load("/usr/lib/libgdal.so") - if (Files.exists(Paths.get("/usr/lib/libgdal.so.30.0.3"))) System.load("/usr/lib/libgdal.so.30.0.3") - if (Files.exists(Paths.get("/usr/lib/jni/libgdalalljni.so.30"))) System.load("/usr/lib/jni/libgdalalljni.so.30") - if (Files.exists(Paths.get("/usr/lib/ogdi/libgdal.so"))) System.load("/usr/lib/ogdi/libgdal.so") + //if (!Files.exists(Paths.get(path))) + System.load(path) + } catch { + case t: Throwable => logWarning(s"Failed to load $path", t) } } private def readResourceBytes(name: String): Array[Byte] = { val bis = new BufferedInputStream(getClass.getResourceAsStream(name)) - try Stream.continually(bis.read()).takeWhile(-1 !=).map(_.toByte).toArray + try { Stream.continually(bis.read()).takeWhile(-1 !=).map(_.toByte).toArray } finally bis.close() } - //noinspection SameParameterValue + // noinspection SameParameterValue private def readResourceLines(name: String): Array[String] = { val bytes = readResourceBytes(name) val lines = new String(bytes).split("\n") diff --git a/src/main/scala/com/databricks/labs/mosaic/utils/PathUtils.scala b/src/main/scala/com/databricks/labs/mosaic/utils/PathUtils.scala index c776aed95..d906c18b1 100644 --- a/src/main/scala/com/databricks/labs/mosaic/utils/PathUtils.scala +++ b/src/main/scala/com/databricks/labs/mosaic/utils/PathUtils.scala @@ -15,7 +15,7 @@ object PathUtils { } def isSubdataset(path: String): Boolean = { - path.contains(":") + path.split(":").length == 3 } def isInMemory(path: String): Boolean = { @@ -42,20 +42,37 @@ object PathUtils { readPath } - def copyToTmp(path: String): String = { - val fileName = path.split("/").last - val extension = fileName.split("\\.").last - val inPath = getCleanPath(path, useZipPath = extension == "zip") + def copyToTmp(rawPath: String): String = { + try { + val path = + if (isSubdataset(rawPath)) { + val _ :: filePath :: _ :: Nil = rawPath.split(":").toList + filePath + } else { + rawPath + } - val randomID = UUID.randomUUID().toString - val tmpDir = Files.createTempDirectory(s"mosaic_$randomID").toFile.getAbsolutePath + val fileName = path.split("/").last + val extension = fileName.split("\\.").last + val inPath = getCleanPath(path, useZipPath = extension == "zip") - val outPath = s"$tmpDir/$fileName" + val randomID = UUID.randomUUID().toString + val tmpDir = Files.createTempDirectory(s"mosaic_$randomID").toFile.getAbsolutePath - Files.createDirectories(Paths.get(tmpDir)) - Files.copy(Paths.get(inPath), Paths.get(outPath)) + val outPath = s"$tmpDir/$fileName" - outPath + Files.createDirectories(Paths.get(tmpDir)) + Files.copy(Paths.get(inPath), Paths.get(outPath)) + + if (isSubdataset(rawPath)) { + val format :: _ :: subdataset :: Nil = rawPath.split(":").toList + getSubdatasetPath(s"$format:$outPath:$subdataset") + } else { + outPath + } + } catch { + case _: Throwable => rawPath + } } def createTmpFilePath(uuid: String, extension: String): String = { diff --git a/src/test/scala/com/databricks/labs/mosaic/datasource/GDALFileFormatTest.scala b/src/test/scala/com/databricks/labs/mosaic/datasource/GDALFileFormatTest.scala index 074bd266a..a4402133a 100644 --- a/src/test/scala/com/databricks/labs/mosaic/datasource/GDALFileFormatTest.scala +++ b/src/test/scala/com/databricks/labs/mosaic/datasource/GDALFileFormatTest.scala @@ -3,11 +3,11 @@ package com.databricks.labs.mosaic.datasource import com.databricks.labs.mosaic.MOSAIC_RASTER_READ_STRATEGY import com.databricks.labs.mosaic.datasource.gdal.GDALFileFormat import org.apache.spark.sql.QueryTest -import org.apache.spark.sql.test.SharedSparkSession +import org.apache.spark.sql.test.SharedSparkSessionGDAL import org.scalatest.matchers.must.Matchers.{be, noException} import org.scalatest.matchers.should.Matchers.an -class GDALFileFormatTest extends QueryTest with SharedSparkSession { +class GDALFileFormatTest extends QueryTest with SharedSparkSessionGDAL { test("Read netcdf with GDALFileFormat") { assume(System.getProperty("os.name") == "Linux") @@ -26,8 +26,6 @@ class GDALFileFormatTest extends QueryTest with SharedSparkSession { .load(filePath) .take(1) - spark.conf.get("spark.sql.adaptive.advisoryPartitionSizeInBytes\t") - noException should be thrownBy spark.read .format("gdal") .option("driverName", "NetCDF") @@ -37,36 +35,35 @@ class GDALFileFormatTest extends QueryTest with SharedSparkSession { } - // TODO: Fix this test case, not sure what is going on with GRIBs -// test("Read grib with GDALFileFormat") { -// assume(System.getProperty("os.name") == "Linux") -// -// val grib = "/binary/grib-cams/" -// val filePath = getClass.getResource(grib).getPath -// -// noException should be thrownBy spark.read -// .format("gdal") -// .option("raster_storage", "disk") -// .option("extensions", "grib") -// .load(filePath) -// .take(1) -// -// noException should be thrownBy spark.read -// .format("gdal") -// .option("raster_storage", "disk") -// .option("extensions", "grib") -// .load(filePath) -// .take(1) -// -// noException should be thrownBy spark.read -// .format("gdal") -// .option("raster_storage", "disk") -// .option("extensions", "grib") -// .load(filePath) -// .select("metadata") -// .take(1) -// -// } + test("Read grib with GDALFileFormat") { + assume(System.getProperty("os.name") == "Linux") + + val grib = "/binary/grib-cams/" + val filePath = getClass.getResource(grib).getPath + + noException should be thrownBy spark.read + .format("gdal") + .option("raster_storage", "disk") + .option("extensions", "grib") + .load(filePath) + .take(1) + + noException should be thrownBy spark.read + .format("gdal") + .option("raster_storage", "disk") + .option("extensions", "grib") + .load(filePath) + .take(1) + + noException should be thrownBy spark.read + .format("gdal") + .option("raster_storage", "disk") + .option("extensions", "grib") + .load(filePath) + .select("metadata") + .take(1) + + } test("Read tif with GDALFileFormat") { assume(System.getProperty("os.name") == "Linux") diff --git a/src/test/scala/com/databricks/labs/mosaic/expressions/raster/RST_MergeAggBehaviors.scala b/src/test/scala/com/databricks/labs/mosaic/expressions/raster/RST_MergeAggBehaviors.scala index eded59687..765ebd42d 100644 --- a/src/test/scala/com/databricks/labs/mosaic/expressions/raster/RST_MergeAggBehaviors.scala +++ b/src/test/scala/com/databricks/labs/mosaic/expressions/raster/RST_MergeAggBehaviors.scala @@ -10,7 +10,7 @@ import org.scalatest.matchers.should.Matchers._ trait RST_MergeAggBehaviors extends QueryTest { // noinspection MapGetGet - def geoReferenceBehavior(indexSystem: IndexSystem, geometryAPI: GeometryAPI): Unit = { + def behaviors(indexSystem: IndexSystem, geometryAPI: GeometryAPI): Unit = { val mc = MosaicContext.build(indexSystem, geometryAPI) mc.register() val sc = spark @@ -28,11 +28,11 @@ trait RST_MergeAggBehaviors extends QueryTest { .load("src/test/resources/modis") val gridTiles = rastersAsPaths - .withColumn("tiles", rst_gridtiles($"path", 3)) + .withColumn("tiles", rst_tessellate($"path", 3)) .select("path", "tiles") .groupBy("path") .agg( - rst_merge_agg($"tiles").as("tiles") + rst_merge_agg($"tiles.raster").as("tiles") ) .select("tiles") diff --git a/src/test/scala/com/databricks/labs/mosaic/expressions/raster/RST_MergeAggTest.scala b/src/test/scala/com/databricks/labs/mosaic/expressions/raster/RST_MergeAggTest.scala index 878020ea4..7689d7685 100644 --- a/src/test/scala/com/databricks/labs/mosaic/expressions/raster/RST_MergeAggTest.scala +++ b/src/test/scala/com/databricks/labs/mosaic/expressions/raster/RST_MergeAggTest.scala @@ -25,7 +25,7 @@ class RST_MergeAggTest extends QueryTest with SharedSparkSessionGDAL with RST_Me test("Testing RST_MergeAgg with manual GDAL registration (H3, JTS).") { noCodegen { assume(System.getProperty("os.name") == "Linux") - //geoReferenceBehavior(H3IndexSystem, JTS) + behaviors(H3IndexSystem, JTS) } } diff --git a/src/test/scala/com/databricks/labs/mosaic/expressions/raster/RST_MergeBehaviors.scala b/src/test/scala/com/databricks/labs/mosaic/expressions/raster/RST_MergeBehaviors.scala index b298a7694..67fbc221a 100644 --- a/src/test/scala/com/databricks/labs/mosaic/expressions/raster/RST_MergeBehaviors.scala +++ b/src/test/scala/com/databricks/labs/mosaic/expressions/raster/RST_MergeBehaviors.scala @@ -10,7 +10,7 @@ import org.scalatest.matchers.should.Matchers._ trait RST_MergeBehaviors extends QueryTest { //noinspection MapGetGet - def geoReferenceBehavior(indexSystem: IndexSystem, geometryAPI: GeometryAPI): Unit = { + def behaviors(indexSystem: IndexSystem, geometryAPI: GeometryAPI): Unit = { val mc = MosaicContext.build(indexSystem, geometryAPI) mc.register() val sc = spark diff --git a/src/test/scala/com/databricks/labs/mosaic/expressions/raster/RST_MergeTest.scala b/src/test/scala/com/databricks/labs/mosaic/expressions/raster/RST_MergeTest.scala index a766b1002..3174a5070 100644 --- a/src/test/scala/com/databricks/labs/mosaic/expressions/raster/RST_MergeTest.scala +++ b/src/test/scala/com/databricks/labs/mosaic/expressions/raster/RST_MergeTest.scala @@ -25,7 +25,7 @@ class RST_MergeTest extends QueryTest with SharedSparkSessionGDAL with RST_Merge test("Testing RST_Merge with manual GDAL registration (H3, JTS).") { noCodegen { assume(System.getProperty("os.name") == "Linux") - //geoReferenceBehavior(H3IndexSystem, JTS) + behaviors(H3IndexSystem, JTS) } } diff --git a/src/test/scala/com/databricks/labs/mosaic/test/TestMosaicGDAL.scala b/src/test/scala/com/databricks/labs/mosaic/test/TestMosaicGDAL.scala index e486247ad..92ffca7b5 100644 --- a/src/test/scala/com/databricks/labs/mosaic/test/TestMosaicGDAL.scala +++ b/src/test/scala/com/databricks/labs/mosaic/test/TestMosaicGDAL.scala @@ -1,54 +1,38 @@ package com.databricks.labs.mosaic.test import com.databricks.labs.mosaic.gdal.MosaicGDAL._ -import com.twitter.chill.Base64.InputStream -import org.apache.spark.sql.SparkSession -import org.apache.spark.SparkException import org.apache.spark.internal.Logging +import org.apache.spark.sql.SparkSession -import java.io.{ByteArrayInputStream, IOException} +import java.io.BufferedInputStream +import java.nio.file.Files import scala.io.{BufferedSource, Source} +import scala.language.postfixOps import scala.sys.process._ object TestMosaicGDAL extends Logging { - def installGDAL(spark: SparkSession): Unit = { - if (!wasEnabled(spark) && !isEnabled) installGDAL(Some(spark)) + private def readResourceBytes(): Array[Byte] = { + val bis = new BufferedInputStream(getClass.getResourceAsStream("/scripts/install-gdal-databricks.sh")) + try { + Stream.continually(bis.read()).takeWhile(-1 !=).map(_.toByte).toArray + } + finally bis.close() } - def installGDAL(spark: Option[SparkSession]): Unit = { - val sc = spark.map(_.sparkContext) - val numExecutors = sc.map(_.getExecutorMemoryStatus.size - 1) - val script = getScript - for (cmd <- script.getLines.toList) { - try { - if (!cmd.startsWith("#") || cmd.nonEmpty) cmd.!! - sc.map { sparkContext => - if (!sparkContext.isLocal) { - sparkContext.parallelize(1 to numExecutors.get).pipe(cmd).collect - } - } - } catch { - case e: IOException => logError(e.getMessage) - case e: IllegalStateException => logError(e.getMessage) - case e: SparkException => logError(e.getMessage) - case e: Throwable => logError(e.getMessage) - } finally { - script.close - } - } + def installGDAL(spark: SparkSession): Unit = { + if (!wasEnabled(spark) && !isEnabled) installGDAL() } - def getScript: BufferedSource = { - System.getProperty("os.name").toLowerCase() match { - case o: String if o.contains("nux") => - val script = Source.fromInputStream(getClass.getResourceAsStream("/scripts/install-gdal-databricks.sh")) - script - case _ => - logInfo("This method only supports Ubuntu Linux with `apt`.") - Source.fromInputStream(getClass.getResourceAsStream("")) - } + def installGDAL(): Unit = { + val bytes = readResourceBytes() + val tempPath = Files.createTempFile("gdal-ubuntu-install", ".sh") + Files.write(tempPath, bytes) + s"chmod +x ${tempPath.toAbsolutePath.toString}".!! + s"sh ${tempPath.toAbsolutePath.toString}".!! } + + } diff --git a/src/test/scala/org/apache/spark/sql/test/SharedSparkSessionGDAL.scala b/src/test/scala/org/apache/spark/sql/test/SharedSparkSessionGDAL.scala index 07df1c2f5..53cce12d0 100644 --- a/src/test/scala/org/apache/spark/sql/test/SharedSparkSessionGDAL.scala +++ b/src/test/scala/org/apache/spark/sql/test/SharedSparkSessionGDAL.scala @@ -27,7 +27,7 @@ trait SharedSparkSessionGDAL extends SharedSparkSession { Try { TestMosaicGDAL.installGDAL(session) val tempPath = Files.createTempDirectory("mosaic-gdal") - MosaicGDAL.prepareEnvironment(session, tempPath.toAbsolutePath.toString, "/usr/lib/jni") + MosaicGDAL.prepareEnvironment(session, tempPath.toAbsolutePath.toString) MosaicGDAL.enableGDAL(session) } }