Skip to content

Commit

Permalink
Improve data prep scripts.
Browse files Browse the repository at this point in the history
  • Loading branch information
milos.colic committed Aug 11, 2023
1 parent e4588ea commit f220d75
Show file tree
Hide file tree
Showing 5 changed files with 167 additions and 463 deletions.
133 changes: 71 additions & 62 deletions notebooks/prototypes/grid_tiles/00 Download STACs.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

# COMMAND ----------

# MAGIC %pip install databricks-mosaic rasterio==1.3.5 --quiet gdal==3.4.3 pystac pystac_client planetary_computer
# MAGIC %pip install databricks-mosaic rasterio==1.3.5 --quiet gdal==3.4.3 pystac pystac_client planetary_computer tenacity rich

# COMMAND ----------

Expand Down Expand Up @@ -40,6 +40,11 @@

# COMMAND ----------

dbutils.fs.rm("/FileStore/geospatial/odin/census/", True)
dbutils.fs.mkdirs("/FileStore/geospatial/odin/census/")

# COMMAND ----------

import urllib.request
urllib.request.urlretrieve(
"https://www2.census.gov/geo/tiger/TIGER2021/COUNTY/tl_2021_us_county.zip",
Expand Down Expand Up @@ -121,18 +126,6 @@

# COMMAND ----------

catalog = pystac_client.Client.open(
"https://planetarycomputer.microsoft.com/api/stac/v1",
modifier=planetary_computer.sign_inplace,
)

# COMMAND ----------

collections = list(catalog.get_collections())
collections

# COMMAND ----------

time_range = "2021-06-01/2021-06-07"

# COMMAND ----------
Expand Down Expand Up @@ -163,84 +156,104 @@

# COMMAND ----------

census_jsons = census_df\
.where("STATEFP == 2")\
.withColumn(
"geom_0",
mos.st_updatesrid("geom_0", "geom_0_srid", F.lit(4326))
)\
.withColumn("geom_0_srid", F.lit(4326))\
.withColumn("area_id", F.hash("geom_0"))\
.withColumn("geojson", mos.st_asgeojson("geom_0"))
eod_items = library.get_assets_for_cells(cell_jsons.repartition(200), time_range ,"sentinel-2-l2a" ).cache()
eod_items.display()

# COMMAND ----------

eod_items = library.get_assets_for_cells(census_jsons.repartition(10), time_range ,"sentinel-2-l2a" ).cache()
#eod_items.display()
# MAGIC %md
# MAGIC From this point we can easily extract the download links for items of interest.

# COMMAND ----------

eod_items = library.get_assets_for_cells(cell_jsons.repartition(200), time_range ,"sentinel-2-l2a" ).cache()
eod_items.display()
dbutils.fs.rm("/FileStore/geospatial/odin/alaska/", True)
dbutils.fs.mkdirs("/FileStore/geospatial/odin/alaska/")

# COMMAND ----------

# MAGIC %md
# MAGIC From this point we can easily extract the download links for items of interest.
# MAGIC %sql
# MAGIC CREATE DATABASE IF NOT EXISTS odin_alaska;
# MAGIC USE odin_alaska;

# COMMAND ----------

item_name = "BO1"
def download_band(eod_items, band_name):
to_download = eod_items\
.withColumn("timestamp", F.col("item_properties.datetime"))\
.groupBy("item_id", "timestamp")\
.agg(
*[F.first(cn).alias(cn) for cn in eod_items.columns if cn not in ["item_id"]]
)\
.withColumn("date", F.to_date("timestamp"))\
.withColumn("href", F.col("asset.href"))\
.where(
f"asset.name == '{band_name}'"
)

spark.sql(f"DROP TABLE IF EXISTS alaska_{band_name}")
dbutils.fs.rm(f"/FileStore/geospatial/odin/alaska/{band_name}", True)
dbutils.fs.mkdirs(f"/FileStore/geospatial/odin/alaska/{band_name}")

catalof_df = to_download\
.withColumn(
"outputfile",
library.download_asset("href", F.lit(f"/dbfs/FileStore/geospatial/odin/alaska/{band_name}"),
F.concat(F.hash(F.rand()), F.lit(".tif")))
)

catalof_df.write\
.mode("overwrite")\
.option("overwriteSchema", "true")\
.format("delta")\
.saveAsTable(f"alaska_{band_name}")

to_download = eod_items\
.withColumn("timestamp", F.col("item_properties.datetime"))\
.groupBy("item_id", "timestamp")\
.agg(
*[F.first(cn).alias(cn) for cn in eod_items.columns]
)\
.withColumn("date", F.to_date("timestamp"))\
.where(
f"asset.name == '{item_name}'"
)

# COMMAND ----------

#to_download = library.get_unique_hrefs(eod_items, item_name="B01")
to_download.display()
import rich.table

# COMMAND ----------
region = census_df.where("STATEFP == 2").select(mos.st_asgeojson("geom_0").alias("geojson")).limit(1).collect()[0]["geojson"]

# MAGIC %fs mkdirs /FileStore/geospatial/odin/alaska/WVP
catalog = pystac_client.Client.open(
"https://planetarycomputer.microsoft.com/api/stac/v1",
modifier=planetary_computer.sign_inplace,
)

# COMMAND ----------
search = catalog.search(
collections=["sentinel-2-l2a"],
intersects=region,
datetime=time_range
)

catalof_df = to_download\
.withColumn(
"outputfile",
library.download_asset("href", F.lit("/dbfs/FileStore/geospatial/odin/alaskaWVP"),
F.concat(F.hash(F.rand()), F.lit(".tif")))
)
items = search.item_collection()

# COMMAND ----------
table = rich.table.Table("Asset Key", "Description")
for asset_key, asset in items[0].assets.items():
table.add_row(asset_key, asset.title)

catalof_df.write.mode("overwrite").format("delta").saveAsTable("mosaic_odin_alaska_WVP")
table

# COMMAND ----------

# MAGIC %md
# MAGIC We have now dowloaded all the tile os interest and we can browse them from our delta table.
bands = []
for asset_key, asset in items[0].assets.items():
bands.append(asset_key)

bands = [b for b in bands if b not in ["visual", "preview", "safe-manifest", "tilejson", "rendered_preview", "granule-metadata", "inspire-metadata", "product-metadata", "datastrip-metadata"]]
bands

# COMMAND ----------

# MAGIC %fs ls /FileStore/geospatial/odin/alaskaWVP
for band in bands:
download_band(eod_items, band)

# COMMAND ----------

catalof_df = spark.read.table("mosaic_odin_alaska_B04")
# MAGIC %fs ls /FileStore/geospatial/odin/alaska/

# COMMAND ----------

catalof_df.display()
# MAGIC %fs ls /FileStore/geospatial/odin/alaska/B02

# COMMAND ----------

Expand All @@ -249,10 +262,6 @@
from rasterio.plot import show

fig, ax = pyplot.subplots(1, figsize=(12, 12))
raster = rasterio.open("""/dbfs/FileStore/geospatial/odin/dais23demo/1219604474tif""")
raster = rasterio.open("""/dbfs/FileStore/geospatial/odin/alaska/B02/-718806860.tif""")
show(raster, ax=ax, cmap='Greens')
pyplot.show()

# COMMAND ----------


41 changes: 36 additions & 5 deletions notebooks/prototypes/grid_tiles/01 Gridded EOD Data.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,11 @@

# COMMAND ----------

# MAGIC %pip install rasterio==1.3.5 --quiet gdal==3.4.3 pystac pystac_client planetary_computer
# MAGIC %pip install databricks-mosaic rasterio==1.3.5 --quiet gdal==3.4.3 pystac pystac_client planetary_computer tenacity rich

# COMMAND ----------

spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "false")
#spark.conf.set("spark.sql.parquet.columnarReaderBatchSize", "2048")

# COMMAND ----------

Expand Down Expand Up @@ -43,8 +42,14 @@

# COMMAND ----------

# MAGIC %sql
# MAGIC USE odin_alaska;
# MAGIC SHOW TABLES;

# COMMAND ----------

catalog_df = \
spark.read.table("mosaic_odin_alaska_B02")\
spark.read.table("alaska_b02")\
.withColumn("souce_band", F.lit("B02"))
catalog_df.display()

Expand Down Expand Up @@ -94,11 +99,37 @@

# COMMAND ----------

grid_tessellate_df.write.mode("overwrite").format("delta").save("dbfs:/FileStore/geospatial/odin/alaska_indexed_B02")
grid_tessellate_df = spark.read.format("delta").load("dbfs:/FileStore/geospatial/odin/alaska_indexed_B02")
def index_band(band_table, resolution):
catalog_df = \
spark.read.table(band_table)\
.withColumn("souce_band", F.col("asset.name"))

tiles_df = catalog_df\
.repartition(200, F.rand())\
.withColumn("raster", mos.rst_subdivide("outputfile", F.lit(8)))\
.withColumn("size", mos.rst_memsize("raster"))

grid_tessellate_df = tiles_df\
.repartition(200, F.rand())\
.withColumn("raster", mos.rst_tessellate("raster", F.lit(resolution)))\
.withColumn("index_id", F.col("raster.index_id"))

grid_tessellate_df.write.mode("overwrite").format("delta").saveAsTable(f"{band_table}_indexed")

# COMMAND ----------

tables_to_index = spark.sql("SHOW TABLES").where("tableName not like '%indexed'").select("tableName").collect()
tables_to_index = [tbl["tableName"] for tbl in tables_to_index]
tables_to_index

# COMMAND ----------

for tbl in tables_to_index:
index_band(tbl, 5)

# COMMAND ----------

grid_tessellate_df = spark.read.table("alaska_b02_indexed")
grid_tessellate_df.display()

# COMMAND ----------
Expand Down
Loading

0 comments on commit f220d75

Please sign in to comment.