From 79b7051a12b3cc8140ebd3d81b1c8ba280abbbf7 Mon Sep 17 00:00:00 2001 From: David Stuebe <84335963+emfdavid@users.noreply.github.com> Date: Thu, 7 Nov 2024 09:26:37 -0500 Subject: [PATCH] remaining corpus of idx mapping functionality (#523) --- Fast_aggregation.ipynb | 619 ++++++++++++++++++++++++++++++++++++ ci/environment-docs.yml | 3 +- ci/environment-py310.yml | 3 +- ci/environment-py311.yml | 3 +- ci/environment-py312.yml | 3 +- dynamicgribchunking.ipynb | 563 ++++++++++++++++++++++++++++++++ kerchunk/_grib_idx.py | 423 +++++++++++++++++++++++- kerchunk/grib2.py | 18 +- kerchunk/tests/test_grib.py | 5 +- pyproject.toml | 3 +- 10 files changed, 1616 insertions(+), 27 deletions(-) create mode 100644 Fast_aggregation.ipynb create mode 100644 dynamicgribchunking.ipynb diff --git a/Fast_aggregation.ipynb b/Fast_aggregation.ipynb new file mode 100644 index 00000000..2b8091b9 --- /dev/null +++ b/Fast_aggregation.ipynb @@ -0,0 +1,619 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Fast NODD GRIB Aggregations" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Overview\n", + "\n", + "In this tutorial we are going to demonstrate building fast kerchunk aggregations of NODD grib2 weather forecasts. This workflow primarily involves [xarray-datatree](https://xarray-datatree.readthedocs.io/en/latest/), [pandas](https://pandas.pydata.org/) and the new `grib_tree` function released in kerchunk v0.2.3.\n", + "\n", + "\n", + "### About the Dataset\n", + "\n", + "In this demo we will be looking at GRIB2 files generated by [**NOAA Global Ensemble Forecast System (GEFS)**](https://www.ncei.noaa.gov/products/weather-climate-models/global-ensemble-forecast). This dataset is a weather forecast model made up of 21 separate forecasts, or ensemble members. GEFS has global coverage and is produced four times a day with forecasts going out to 16 days. It is updated 4 times a day, every 6 hours starting at midnight.\n", + "\n", + "More information on this dataset can be found [here](https://registry.opendata.aws/noaa-gefs)\n", + "\n", + "\n", + "## Prerequisites\n", + "| Concepts | Importance | Notes |\n", + "| --- | --- | --- |\n", + "| [Kerchunk Basics](../foundations/kerchunk_basics) | Required | Core |\n", + "| [Pandas Tutorial](https://foundations.projectpythia.org/core/pandas/pandas.html#) | Required | Core |\n", + "| [Kerchunk and Xarray-Datatree](https://projectpythia.org/kerchunk-cookbook/notebooks/using_references/Datatree.html) | Required | IO |\n", + "| [Xarray-Datatree Overview](https://xarray-datatree.readthedocs.io/en/latest/quick-overview.html)| Required | IO |\n", + "\n", + "- **Time to learn**: 30 minutes\n", + "\n", + "## Motivation\n", + "\n", + "As we know that **kerchunk** provides a unified way to represent a variety of chunked, compressed data formats (e.g. NetCDF/HDF5, GRIB2, TIFF, …) by generating *references*. This task flow has ability to build large aggregations from **NODD grib forecasts**\n", + "in a fraction of the time using the `idx files`." + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Imports" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "\n", + "import logging\n", + "import importlib\n", + "importlib.reload(logging)\n", + "logging.basicConfig(\n", + " format=\"%(asctime)s.%(msecs)03dZ %(processName)s %(threadName)s %(levelname)s:%(name)s:%(message)s\",\n", + " datefmt=\"%Y-%m-%dT%H:%M:%S\",\n", + " level=logging.WARNING,\n", + ")\n", + "\n", + "logger = logging.getLogger(\"juypter\")\n", + "\n", + "\n", + "import copy\n", + "import fsspec\n", + "import pandas as pd\n", + "import xarray as xr\n", + "import datetime\n", + "from kerchunk.grib2 import (\n", + " AggregationType,\n", + " build_idx_grib_mapping,\n", + " extract_datatree_chunk_index,\n", + " grib_tree,\n", + " map_from_index,\n", + " parse_grib_idx,\n", + " reinflate_grib_store,\n", + " scan_grib,\n", + " strip_datavar_chunks,\n", + ")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Building the Aggregation directly from the GRIB files\n", + "\n", + "We are using the newly implemented Kerchunk `grib_tree` function to build a hierarchical data model from a set of scanned grib messages.This data model can be opened directly using either zarr or xarray datatree. *This way of building the aggregation is very slow*. Here we're going to use xarray-datatree to open and view it:" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "s3_files = [\n", + " \"s3://noaa-gefs-pds/gefs.20170101/06/gec00.t06z.pgrb2af006\",\n", + " \"s3://noaa-gefs-pds/gefs.20170101/06/gec00.t06z.pgrb2af012\",\n", + " \"s3://noaa-gefs-pds/gefs.20170101/06/gec00.t06z.pgrb2af018\",\n", + " \"s3://noaa-gefs-pds/gefs.20170101/06/gec00.t06z.pgrb2af024\",\n", + " \"s3://noaa-gefs-pds/gefs.20170101/06/gec00.t06z.pgrb2af030\",\n", + " \"s3://noaa-gefs-pds/gefs.20170101/06/gec00.t06z.pgrb2af036\",\n", + " \"s3://noaa-gefs-pds/gefs.20170101/06/gec00.t06z.pgrb2af042\",\n", + "]" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# converting the references into the hierarchical datamodel\n", + "grib_tree_store = grib_tree(\n", + " [\n", + " group\n", + " for f in s3_files\n", + " for group in scan_grib(f, storage_options=dict(anon=True))\n", + " ],\n", + " remote_options=dict(anon=True),\n", + ")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Transforming the output to datatree to view it. This tree model the variables\n", + "s3_dt = xr.open_datatree(\n", + " fsspec.filesystem(\n", + " \"reference\",\n", + " fo=grib_tree_store,\n", + " remote_protocol=\"s3\",\n", + " remote_options={\"anon\": True},\n", + " ).get_mapper(\"\"),\n", + " engine=\"zarr\",\n", + " consolidated=False,\n", + ")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# In this tree model, the variables are organized into hierarchical groups, first by \"stepType\" and then by \"typeOfLevel.\"\n", + "s3_dt" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "> **Note**: If trying out this notebook, the above way of building the aggregation will take few minutes for this `GEFS` S3 repository. But in general, it take more time, based on the size of the grib files" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Building the aggregation faster with `idx` files\n", + "\n", + "This method of aggregating GRIB files requires that each GRIB file be accompanied by its corresponding `idx` file in text format. This file, also called index files, contain some key metadata of the GRIB messages across the files. These metadata is then used to build an reference index, which helps in aggregation. \n", + "\n", + "The advantage is, building aggregation this way doesn't involve scanning the whole archive of GRIB files but only a single file. \n", + "\n", + "### Index Dataframe made from a single Grib file\n", + "\n", + "Here is what the contents of an `idx` file looks like.\n", + "\n", + "```\n", + "1:0:d=2017010100:HGT:10 mb:12 hour fcst:ENS=low-res ctl\n", + "2:48163:d=2017010100:TMP:10 mb:12 hour fcst:ENS=low-res ctl\n", + "3:68112:d=2017010100:RH:10 mb:12 hour fcst:ENS=low-res ctl\n", + "4:79092:d=2017010100:UGRD:10 mb:12 hour fcst:ENS=low-res ctl\n", + "5:102125:d=2017010100:VGRD:10 mb:12 hour fcst:ENS=low-res ctl\n", + "6:122799:d=2017010100:HGT:50 mb:12 hour fcst:ENS=low-res ctl\n", + "7:178898:d=2017010100:TMP:50 mb:12 hour fcst:ENS=low-res ctl\n", + "8:201799:d=2017010100:RH:50 mb:12 hour fcst:ENS=low-res ctl\n", + "9:224321:d=2017010100:UGRD:50 mb:12 hour fcst:ENS=low-res ctl\n", + "10:272234:d=2017010100:VGRD:50 mb:12 hour fcst:ENS=low-res ctl\n", + "11:318288:d=2017010100:HGT:100 mb:12 hour fcst:ENS=low-res ctl\n", + "12:379010:d=2017010100:TMP:100 mb:12 hour fcst:ENS=low-res ctl\n", + "13:405537:d=2017010100:RH:100 mb:12 hour fcst:ENS=low-res ctl\n", + "14:441517:d=2017010100:UGRD:100 mb:12 hour fcst:ENS=low-res ctl\n", + "15:497421:d=2017010100:VGRD:100 mb:12 hour fcst:ENS=low-res ctl\n", + "```\n", + "\n", + "The general format of `idx` data is: **index:offset:date_with_runtime:variable:forecast_time:**.\n", + "\n", + "The metadata are separated by \":\" (colon) and we need to convert it into a `Dataframe` for building the index. \n", + "\n", + "\n", + "> **Note**: This way of building the reference index only works for a particular **horizon** file irrespective of the run time of the model, as the messages from a same time horizon have an identical structure" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# converting the idx data into a dataframe\n", + "idxdf = parse_grib_idx(\n", + " #\"s3://noaa-gefs-pds/gefs.20170101/06/gec00.t06z.pgrb2af006\",\n", + " \"s3://noaa-gefs-pds/gefs.20230101/00/atmos/pgrb2sp25/geavg.t00z.pgrb2s.0p25.f006\",\n", + " storage_options=dict(anon=True),\n", + " \n", + ")\n", + "idxdf.head()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Building a mapping between the index data and grib metadata\n", + "\n", + "Creation of the reference index or **k_index** first involves generating a single mapping from the grib/zarr metadata(the `grib_tree` output) to the attributes in the idx files. They are unique for each time horizon, so while building the index, we need to build a unique mapping for the 1 hour forecast, the 2 hour forecast and so on. \n", + "\n", + "This mapping can be made from any arbitrary GRIB file, given that the it belongs to the **same time horizon** of files we're trying to index. \n", + "\n", + "We'll start by examining the GRIB data. The metadata that we'll be extracting will be static in nature. We're going to use a single node by [accessing](https://projectpythia.org/kerchunk-cookbook/notebooks/using_references/Datatree.html#accessing-the-datatree) it with datatree." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "s3_dt.groups" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Parsing the grib metadata from a single datatree node and converting it into a dataframe\n", + "grib_df = extract_datatree_chunk_index(s3_dt[\"sulwrf/avg/nominalTop\"], grib_tree_store)\n", + "grib_df.head()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "> **Note**: Above process is part of the mapping creation, the function call to `extract_datatree_chunk_index` in handled inside `build_idx_grib_mapping` function" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# creating a mapping for a single horizon file which is to be used later\n", + "mapping = build_idx_grib_mapping(\n", + " #\"s3://noaa-gefs-pds/gefs.20170101/00/geavg00.t00z.pgrb2f006\",\n", + " \"s3://noaa-gefs-pds/gefs.20230101/00/atmos/pgrb2sp25/geavg.t00z.pgrb2s.0p25.f006\",\n", + " storage_options=dict(anon=True),\n", + " validate=True,\n", + ")\n", + "mapping.head()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + ">**Note**: Reference index can be combined across many horizons but **each horizon must be indexed separately**." + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Building the index \n", + "\n", + "Now if we parse the runtime from the idx file and other attributes, we can build a fully compatible kerchunk index(k_index) for that \n", + "**particular file**. Before creating the index, we need to clean some of the data in the mapping and index dataframe for the some variables as they tend to contain duplicate values, as demonstrated below. " + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# cleaning the mapping\n", + "mapping.loc[mapping[\"attrs\"].duplicated(keep=False), :]" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# this step will be performed for every idx dataframe where we will be using the \"mapping\" dataframe which we created previously\n", + "mapped_index = map_from_index(\n", + " pd.Timestamp(\"2017-01-01T06\"),\n", + " mapping.loc[~mapping[\"attrs\"].duplicated(keep=\"first\"), :],\n", + " idxdf.loc[~idxdf[\"attrs\"].duplicated(keep=\"first\"), :],\n", + ")\n", + "mapped_index.head()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Final step of building of Aggregation\n", + "\n", + "For the final step of the aggregation, we will create an index for each GRIB file to cover a two-month period. We will be using the 6-hour horizon file for building the aggregation, this will be from `2017-01-01` to `2017-02-28`. The generated index can then be combined with indexes of other time horizons and stored for later use. " + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "%%time\n", + "mapped_index_list = []\n", + "\n", + "deduped_mapping = mapping.loc[~mapping[\"attrs\"].duplicated(keep=\"first\"), :]\n", + "\n", + "# this process is manually done because the runtime and forecast time will vary between models i.e. GEFS, GFS, HRRR etc.\n", + "for date in pd.date_range(\"2023-01-01\", \"2023-02-28\"):\n", + " for runtime in range(0, 24, 6):\n", + " #fname = f\"s3://noaa-gefs-pds/gefs.{date.strftime('%Y%m%d')}/{runtime:02}/gec00.t{runtime:02}z.pgrb2af006\"\n", + " fname = f\"s3://noaa-gefs-pds/gefs.{date.strftime('%Y%m%d')}/{runtime:02}/atmos/pgrb2sp25/geavg.t{runtime:02}z.pgrb2s.0p25.f006\"\n", + "\n", + " idxdf = parse_grib_idx(basename=fname, storage_options=dict(anon=True))\n", + " \n", + " mapped_index = map_from_index(\n", + " pd.Timestamp(date + datetime.timedelta(hours=runtime)),\n", + " deduped_mapping,\n", + " idxdf.loc[~idxdf[\"attrs\"].duplicated(keep=\"first\"), :],\n", + " )\n", + " \n", + " mapped_index_list.append(mapped_index)\n", + "\n", + "s3_kind = pd.concat(mapped_index_list)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "> **Tip**: To confirm the above step, check out this [notebook](https://gist.github.com/Anu-Ra-g/efa01ad1c274c1bd1c14ee01666caa77)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Using the aggregation\n", + "\n", + "The difference between .idx and Kerchunk index that we built is that the former indexes the GRIB messages and the latter indexes the variables in those messages, in a time horizon. Now we'll need a tree model from grib_tree function to reinflate index (variables in the messages). The important point to note here is that the tree model should be made from the GRIB files of the repository that we are indexing." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "grib_tree_store = grib_tree(\n", + " scan_grib(\n", + " #\"s3://noaa-gefs-pds/gefs.20170101/06/gec00.t06z.pgrb2af006\",\n", + " \"s3://noaa-gefs-pds/gefs.20230101/00/atmos/pgrb2sp25/geavg.t00z.pgrb2s.0p25.f006\",\n", + " storage_options=dict(anon=True),\n", + " ),\n", + " remote_options=dict(anon=True),\n", + ")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Then we need to run that tree model through `strip_datavar_chunks` function, which will strip the tree of the variables *in place*. This step is optional" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "deflated_s3_grib_tree_store = copy.deepcopy(\n", + " grib_tree_store\n", + ") # not affecting the original tree model\n", + "strip_datavar_chunks(deflated_s3_grib_tree_store)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "After stripping the tree model, we need to introduce two new axes: the `runtime` used for mapping and `date_range` for indexing the new reinflated tree. " + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "axes = [\n", + " pd.Index(\n", + " [\n", + " pd.timedelta_range(\n", + " start=\"0 hours\", end=\"6 hours\", freq=\"6h\", closed=\"right\", name=\"6 hour\"\n", + " ),\n", + " ],\n", + " name=\"step\",\n", + " ),\n", + " pd.date_range(\n", + " \"2023-01-01T00:00\", \"2023-02-28T18:00\", freq=\"360min\", name=\"valid_time\"\n", + " ),\n", + "]\n", + "axes" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "s3_kind.loc[s3_kind.varname.isin([\"sdswrf\"])].head(20)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Now for reinflating, we'll be needing the aggregation types which are: `horizon`, `valid_time`, `run_time` and `best_available`. We will also be needing the variable(s) that we are reinflating. " + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "s3_store = reinflate_grib_store(\n", + " axes=axes,\n", + " aggregation_type=AggregationType.HORIZON,\n", + " chunk_index=s3_kind.loc[s3_kind.varname.isin([\"sulwrf\", \"prmsl\", \"sdswrf\"])],\n", + " zarr_ref_store=deflated_s3_grib_tree_store,\n", + ")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Viewing the new subset of the datatree\n", + "\n", + "In this step, we can view the new subset as a `datatree` model. " + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "s3_dt_subset = xr.open_datatree(\n", + " fsspec.filesystem(\n", + " \"reference\", fo=s3_store, remote_protocol=\"s3\", remote_options={\"anon\": True}\n", + " ).get_mapper(\"\"),\n", + " engine=\"zarr\",\n", + " consolidated=False,\n", + ")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "s3_dt_subset" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "s3_dt_subset.groups" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "s3_dt_subset.sdswrf.avg.surface.sdswrf[0,3,:,:].plot(figsize=(12,8))" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "da = s3_dt_subset.sulwrf.avg.nominalTop.sulwrf.isel(model_horizons=0)\n", + "da" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "#### Create an index for time" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "da= da.rename({'valid_times':'time'}).drop_vars(['step'])" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "da = da.set_index(time='time')" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "da" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "da.sel(time='2023-01-02', method='nearest')" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "#### Try extracting some data. \n", + "The results from the cell below show lot's of NaN and what look like integer values. This doesn't seem like what would be expected for \"Surface upward long-wave radiation flux\", right?" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "%%time\n", + "da[:,90,180].load()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3 (ipykernel)", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.10.14" + } + }, + "nbformat": 4, + "nbformat_minor": 4 +} diff --git a/ci/environment-docs.yml b/ci/environment-docs.yml index 71f8ebb9..9c7ff99c 100644 --- a/ci/environment-docs.yml +++ b/ci/environment-docs.yml @@ -6,8 +6,7 @@ dependencies: - python=3.10 - dask - zarr - - xarray - - xarray-datatree + - xarray>=2024.10.0 - h5netcdf - h5py - pandas diff --git a/ci/environment-py310.yml b/ci/environment-py310.yml index 3f7f7a2e..970acd42 100644 --- a/ci/environment-py310.yml +++ b/ci/environment-py310.yml @@ -6,8 +6,7 @@ dependencies: - python=3.10 - dask - zarr - - xarray - - xarray-datatree + - xarray>=2024.10.0 - h5netcdf - h5py - pandas diff --git a/ci/environment-py311.yml b/ci/environment-py311.yml index d8f5be81..20604aa8 100644 --- a/ci/environment-py311.yml +++ b/ci/environment-py311.yml @@ -6,8 +6,7 @@ dependencies: - python=3.11 - dask - zarr - - xarray - - xarray-datatree + - xarray>=2024.10.0 - h5netcdf - h5py - pandas diff --git a/ci/environment-py312.yml b/ci/environment-py312.yml index 854c80b6..0f8f69d5 100644 --- a/ci/environment-py312.yml +++ b/ci/environment-py312.yml @@ -6,8 +6,7 @@ dependencies: - python=3.12 - dask - zarr - - xarray - - xarray-datatree + - xarray>=2024.10.0 - h5netcdf - h5py - pandas diff --git a/dynamicgribchunking.ipynb b/dynamicgribchunking.ipynb new file mode 100644 index 00000000..26bd8de7 --- /dev/null +++ b/dynamicgribchunking.ipynb @@ -0,0 +1,563 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "metadata": { + "id": "n8nQijCi9wVu" + }, + "source": [ + "# GRIB2 NWP Archive aggretation using .idx files\n", + "\n", + "Work with NODD GFS data on GCP (theoretically works with AWS as well)\n", + "\n", + "1) Demonstrate building a DataTree, Index File mapping and Static Metdata data file. Use the mapping to rapidly build a on month dataset.\n", + "2) Demonstrate using zarr chunk_getitems hack to parallelize getting a timeseries\n", + " " + ] + }, + { + "cell_type": "markdown", + "metadata": { + "id": "326f3vGe_HHb" + }, + "source": [ + "## Import some stuff..." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "id": "3DQi8Hvg_GSD" + }, + "outputs": [], + "source": [ + "%load_ext autoreload\n", + "%autoreload 2\n", + "\n", + "import logging\n", + "import importlib\n", + "\n", + "importlib.reload(logging)\n", + "logging.basicConfig(\n", + " format=\"%(asctime)s.%(msecs)03dZ %(processName)s %(threadName)s %(levelname)s:%(name)s:%(message)s\",\n", + " datefmt=\"%Y-%m-%dT%H:%M:%S\",\n", + " level=logging.WARNING,\n", + ")\n", + "\n", + "logger = logging.getLogger(\"juypter\")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "id": "NzO2IOsm_MfL" + }, + "outputs": [], + "source": [ + "import datetime\n", + "import copy\n", + "import xarray as xr\n", + "import numpy as np\n", + "import pandas as pd\n", + "import fsspec\n", + "import kerchunk\n", + "from kerchunk.grib2 import (\n", + " grib_tree, scan_grib, extract_datatree_chunk_index, strip_datavar_chunks, \n", + " reinflate_grib_store, AggregationType, read_store, write_store, parse_grib_idx,\n", + " build_idx_grib_mapping, map_from_index\n", + ")\n", + "import gcsfs\n", + "\n", + "pd.set_option('display.max_columns', None)\n" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "id": "ueTGv6_g6plE" + }, + "source": [ + "## Extract the zarr store metadata\n", + "\n", + "\n", + "Pick a file, any file... Must be on GCS so that coords use the same file store as the data vars" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "colab": { + "base_uri": "https://localhost:8080/" + }, + "id": "TvOmUkr-6plE", + "outputId": "e61f1f20-0c43-40bb-ca6a-4c5076e76a9f" + }, + "outputs": [], + "source": [ + "# Pick two files to build a grib_tree with the correct dimensions\n", + "gfs_files = [\n", + " \"gs://global-forecast-system/gfs.20230928/00/atmos/gfs.t00z.pgrb2.0p25.f000\",\n", + " \"gs://global-forecast-system/gfs.20230928/00/atmos/gfs.t00z.pgrb2.0p25.f001\"\n", + "]\n", + "\n", + "# This operation reads two of the large Grib2 files from GCS\n", + "# scan_grib extracts the zarr kerchunk metadata for each individual grib message\n", + "# grib_tree builds a zarr/xarray compatible hierarchical view of the dataset\n", + "gfs_grib_tree_store = grib_tree([group for f in gfs_files for group in scan_grib(f)])\n", + "# it is slow even in parallel because it requires a huge amount of IO\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "colab": { + "base_uri": "https://localhost:8080/", + "height": 186 + }, + "id": "IEOG_v8O5KVC", + "outputId": "4b8e9b14-efc2-440e-9953-bd15df58bcc3" + }, + "outputs": [], + "source": [ + "# The grib_tree can be opened directly using either zarr or xarray datatree\n", + "# But this is too slow to build big aggregations\n", + "gfs_dt = xr.open_datatree(fsspec.filesystem(\"reference\", fo=gfs_grib_tree_store).get_mapper(\"\"), engine=\"zarr\", consolidated=False)\n", + "gfs_dt\n" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "id": "R0Ew4zOIEoIw" + }, + "source": [ + "## Separating static metadata from the chunk indexes" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "colab": { + "base_uri": "https://localhost:8080/", + "height": 423 + }, + "id": "ukcrJ9fi5NSS", + "outputId": "0c9b6d89-883c-4fad-d891-6c62973c115f" + }, + "outputs": [], + "source": [ + "# The key metadata associated with each grib message can be extracted into a table\n", + "gfs_kind = extract_datatree_chunk_index(gfs_dt, gfs_grib_tree_store, grib=True)\n", + "gfs_kind" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "colab": { + "base_uri": "https://localhost:8080/" + }, + "id": "hX7Hh3aB6plF", + "outputId": "ede04c84-cc23-4260-f8c2-6973f2ee56bc" + }, + "outputs": [], + "source": [ + "# While the static zarr metadata associated with the dataset can be seperated - created once.\n", + "deflated_gfs_grib_tree_store = copy.deepcopy(gfs_grib_tree_store)\n", + "strip_datavar_chunks(deflated_gfs_grib_tree_store)\n", + "\n", + "write_store(\"./\", deflated_gfs_grib_tree_store)\n", + "\n", + "print(\"Original references: \", len(gfs_grib_tree_store[\"refs\"]))\n", + "print(\"Stripped references: \", len(deflated_gfs_grib_tree_store[\"refs\"]))\n" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "id": "WlbvPP4G6plF" + }, + "source": [ + "## Building it faster\n", + "\n", + "Okay that was fun - I promise you can recombine these pieces but it still takes the same amount of time to run scan_grib.\n", + "\n", + "The k(erchunk) index data looks a lot like the idx files that are present for every grib file in NODD's GCS archive though...\n", + "\n", + "\n", + "```\n", + "1:0:d=2023092800:PRMSL:mean sea level:1 hour fcst:\n", + "2:990253:d=2023092800:CLWMR:1 hybrid level:1 hour fcst:\n", + "3:1079774:d=2023092800:ICMR:1 hybrid level:1 hour fcst:\n", + "4:1332540:d=2023092800:RWMR:1 hybrid level:1 hour fcst:\n", + "5:1558027:d=2023092800:SNMR:1 hybrid level:1 hour fcst:\n", + "6:1638489:d=2023092800:GRLE:1 hybrid level:1 hour fcst:\n", + "7:1673516:d=2023092800:REFD:1 hybrid level:1 hour fcst:\n", + "8:2471710:d=2023092800:REFD:2 hybrid level:1 hour fcst:\n", + "9:3270627:d=2023092800:REFC:entire atmosphere:1 hour fcst:\n", + "10:4144435:d=2023092800:VIS:surface:1 hour fcst:\n", + "```\n", + "\n", + "But the metadata isn't quiet the same... they have mangled the attributes in the : seperated attributes." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "colab": { + "base_uri": "https://localhost:8080/", + "height": 628 + }, + "id": "XGNMVeWrHzJD", + "outputId": "2c270e66-38ef-4acd-bc8f-62e3b1cd4e2b" + }, + "outputs": [], + "source": [ + "# We can pull this out into a dataframe, that starts to look a bit like what we got above extracted from the actual grib files\n", + "# But this method runs in under a second reading a file that is less than 100k\n", + "idxdf = parse_grib_idx(\n", + " basename=\"gs://global-forecast-system/gfs.20230901/00/atmos/gfs.t00z.pgrb2.0p25.f006\"\n", + ")\n", + "idxdf" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "colab": { + "base_uri": "https://localhost:8080/", + "height": 275 + }, + "id": "leAb3jXChBVQ", + "outputId": "d6d842d0-e575-4c29-cd72-a4f999a946ed" + }, + "outputs": [], + "source": [ + "# Unfortunately, some accumulation variables have duplicate attributes making them\n", + "# indesinguishable from the IDX file\n", + "idxdf.loc[idxdf['attrs'].duplicated(keep=False), :]\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "colab": { + "base_uri": "https://localhost:8080/", + "height": 943 + }, + "id": "6ws0cK3gHzMK", + "outputId": "62e74a5a-86db-4a71-93d8-120515108b2f" + }, + "outputs": [], + "source": [ + "# What we need is a mapping from our grib/zarr metadata to the attributes in the idx files\n", + "# They are unique for each time horizon e.g. you need to build a unique mapping for the 1 hour\n", + "# forecast, the 2 hour forecast... the 48 hour forecast.\n", + "\n", + "# let's make one for the 6 hour horizon. This requires reading both the grib and the idx file,\n", + "# mapping the data for each grib message in order\n", + "mapping = build_idx_grib_mapping(\n", + " basename=\"gs://global-forecast-system/gfs.20230928/00/atmos/gfs.t00z.pgrb2.0p25.f006\",\n", + ")\n", + "mapping" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "colab": { + "base_uri": "https://localhost:8080/", + "height": 614 + }, + "id": "CzU9fWn5HzPS", + "outputId": "770783d1-846a-47ce-c01f-02eea30939ef" + }, + "outputs": [], + "source": [ + "# Now if we parse the RunTime from the idx file name `gfs.20230901/00/`\n", + "# We can build a fully compatible k_index\n", + "mapped_index = map_from_index(\n", + " pd.Timestamp(\"2023-09-01T00\"),\n", + " mapping.loc[~mapping[\"attrs\"].duplicated(keep=\"first\"), :],\n", + " idxdf.loc[~idxdf[\"attrs\"].duplicated(keep=\"first\"), :]\n", + ")\n", + "mapped_index" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "mapping.loc[mapping.varname == \"sdswrf\"]" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "colab": { + "base_uri": "https://localhost:8080/", + "height": 770 + }, + "id": "Ny9xKXjP7n57", + "outputId": "15a5793b-a9df-4065-a403-1a35aa5c7cf8" + }, + "outputs": [], + "source": [ + "mapped_index_list = []\n", + "\n", + "deduped_mapping = mapping.loc[~mapping[\"attrs\"].duplicated(keep=\"first\"), :]\n", + "for date in pd.date_range(\"2023-09-01\", \"2023-09-30\"):\n", + " for runtime in range(0,24,6):\n", + " horizon=6\n", + " fname=f\"gs://global-forecast-system/gfs.{date.strftime('%Y%m%d')}/{runtime:02}/atmos/gfs.t{runtime:02}z.pgrb2.0p25.f{horizon:03}\"\n", + "\n", + " idxdf = parse_grib_idx(\n", + " basename=fname\n", + " )\n", + "\n", + " mapped_index = map_from_index(\n", + " pd.Timestamp( date + datetime.timedelta(hours=runtime)),\n", + " deduped_mapping,\n", + " idxdf.loc[~idxdf[\"attrs\"].duplicated(keep=\"first\"), :],\n", + " )\n", + " mapped_index_list.append(mapped_index)\n", + "\n", + "gfs_kind = pd.concat(mapped_index_list)\n", + "gfs_kind\n" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "id": "VF60ETYemD6N" + }, + "source": [ + "## We just aggregated a 120 GFS grib files in 18 seconds!\n", + "\n", + "Lets build it back into a data tree!\n", + "\n", + "The reinflate_grib_store interface is pretty opaque but allows building any slice of an FMRC. A good area for future improvement, but for now, since\n", + "we have just a single 6 hour horizon slice let's build that..." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "colab": { + "base_uri": "https://localhost:8080/" + }, + "id": "IW1M7go56plF", + "outputId": "17c2fa69-81f8-4d21-f260-4233e902a6ac" + }, + "outputs": [], + "source": [ + "axes = [\n", + " pd.Index(\n", + " [\n", + " pd.timedelta_range(start=\"0 hours\", end=\"6 hours\", freq=\"6h\", closed=\"right\", name=\"6 hour\"),\n", + " ],\n", + " name=\"step\"\n", + " ),\n", + " pd.date_range(\"2023-09-01T06:00\", \"2023-10T00:00\", freq=\"360min\", name=\"valid_time\")\n", + "]\n", + "axes" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "colab": { + "base_uri": "https://localhost:8080/" + }, + "id": "_M9oH4TJ6plG", + "outputId": "c6043655-7c8b-4981-cb5f-f5d6a903315b" + }, + "outputs": [], + "source": [ + "# It is fast to rebuild the datatree - but lets pull out two varables to look at...\n", + "\n", + "# If you skipped building the deflated store, read it here.\n", + "#deflated_gfs_grib_tree_store = read_store(\"./\")\n", + "\n", + "gfs_store = reinflate_grib_store(\n", + " axes=axes,\n", + " aggregation_type=AggregationType.HORIZON,\n", + " chunk_index=gfs_kind.loc[gfs_kind.varname.isin([\"sdswrf\", \"t2m\"])],\n", + " zarr_ref_store=deflated_gfs_grib_tree_store\n", + ")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "colab": { + "base_uri": "https://localhost:8080/", + "height": 1000 + }, + "id": "Eeml5x_d6plG", + "outputId": "19052d52-3ca5-4d1e-9a8a-a0608381b342" + }, + "outputs": [], + "source": [ + "gfs_dt = xr.open_datatree(fsspec.filesystem(\"reference\", fo=gfs_store).get_mapper(\"\"), engine=\"zarr\", consolidated=False)\n", + "gfs_dt" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "colab": { + "base_uri": "https://localhost:8080/", + "height": 296 + }, + "id": "MFCPjIJnpKh4", + "outputId": "98c66ef2-8034-4b5e-f239-65e68c679e97" + }, + "outputs": [], + "source": [ + "# Reading the data - especially extracting point time series isn't any faster once you have\n", + "# the xarray datatree. This is just a much faster way of building the aggregations than\n", + "# directly running scan_grib over all the data first.\n", + "gfs_dt.sdswrf.avg.surface.sdswrf[0,0:10,300,400].compute()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "scrolled": true + }, + "outputs": [], + "source": [ + "gfs_dt.t2m.instant.heightAboveGround.t2m[0,0:10,300,400].compute()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "colab": { + "base_uri": "https://localhost:8080/", + "height": 735 + }, + "id": "PSYb4OasbTKG", + "outputId": "b4031b98-24fa-451d-e460-452dfad3695a" + }, + "outputs": [], + "source": [ + "gfs_dt.sdswrf.avg.surface.sdswrf[0,1,:,:].plot(figsize=(12,8))" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "colab": { + "base_uri": "https://localhost:8080/", + "height": 735 + }, + "id": "7zAfk1IshXMH", + "outputId": "640f0354-d7f0-4a5f-f928-1ad1d0fcd481" + }, + "outputs": [], + "source": [ + "gfs_dt.sdswrf.avg.surface.sdswrf[0,2,:,:].plot(figsize=(12,8))" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "colab": { + "base_uri": "https://localhost:8080/", + "height": 735 + }, + "id": "T3gM0XLXhoZp", + "outputId": "c37186b5-dbb1-4c98-c6e1-a787a5b85dae" + }, + "outputs": [], + "source": [ + "gfs_dt.sdswrf.avg.surface.sdswrf[0,3,:,:].plot(figsize=(12,8))" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Timeseries" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# from joblib import parallel_config\n", + "# with parallel_config(n_jobs=8):\n", + " #res = gfs_dt.dswrf.avg.surface.dswrf.interp(longitude=[320.5, 300.2], latitude=[20.6, 45.7], method=\"linear\")\n", + " \n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "res = gfs_dt.sdswrf.avg.surface.sdswrf.interp(longitude=[320.5], latitude=[20.6], method=\"linear\")\n", + "res.plot()\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [] + } + ], + "metadata": { + "colab": { + "authorship_tag": "ABX9TyPQShGkdrEd0WnSDM4SoI/N", + "include_colab_link": true, + "provenance": [] + }, + "kernelspec": { + "display_name": "Python 3 (ipykernel)", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.10.14" + } + }, + "nbformat": 4, + "nbformat_minor": 4 +} diff --git a/kerchunk/_grib_idx.py b/kerchunk/_grib_idx.py index 2ae136c2..1c5ddbc2 100644 --- a/kerchunk/_grib_idx.py +++ b/kerchunk/_grib_idx.py @@ -49,20 +49,42 @@ For more information on indexing, check out this link: https://github.com/asascience-open/nextgen-dmac/pull/57 -""" + +Original license +---------------- +MIT License Copyright (c) 2023 Camus Energy + +Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated +documentation files (the "Software"), to deal in the Software without restriction, including without limitation the +rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit +persons to whom the Software is furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all copies or substantial portions of the +Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE +WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR +COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR +OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. +""" # noqa: E501 import ujson import itertools +import os +import gzip +import re import logging import fsspec +from enum import unique, Enum +from functools import cache import warnings -from typing import Iterable, Dict, TYPE_CHECKING, Optional, Callable - - -if TYPE_CHECKING: - import pandas as pd - import datatree +from typing import Iterable, Dict, Optional, Callable, Any +import numpy as np +import pandas as pd +import xarray as xr +import base64 +import copy class DynamicZarrStoreError(ValueError): @@ -71,6 +93,383 @@ class DynamicZarrStoreError(ValueError): logger = logging.getLogger("grib-indexing") +COORD_DIM_MAPPING: dict[str, str] = dict( + time="run_times", + valid_time="valid_times", + step="model_horizons", +) + +ZARR_TREE_STORE = "zarr_tree_store.json.gz" + + +def repeat_steps(step_index: pd.TimedeltaIndex, to_length: int) -> np.array: + return np.tile(step_index.to_numpy(), int(np.ceil(to_length / len(step_index))))[ + :to_length + ] + + +def create_steps(steps_index: pd.Index, to_length) -> np.array: + return np.vstack([repeat_steps(si, to_length) for si in steps_index]) + + +def store_coord_var(key: str, zstore: dict, coords: tuple[str, ...], data: np.array): + if np.isnan(data).any(): + if f"{key}/.zarray" not in zstore: + logger.debug("Skipping nan coordinate with no variable %s", key) + return + else: + logger.info("Trying to add coordinate var %s with nan value!", key) + + zattrs = ujson.loads(zstore[f"{key}/.zattrs"]) + zarray = ujson.loads(zstore[f"{key}/.zarray"]) + # Use list not tuple + zarray["chunks"] = [*data.shape] + zarray["shape"] = [*data.shape] + + if key.endswith("step"): + zarray["dtype"] = " dict: + """ + Given a zarr_store hierarchy, pull out the variables present in the chunks dataframe and reinflate the + zarr variables adding any needed dimensions. This is a select operation - based on the time axis + provided. + Assumes everything is stored in hours per grib convention. + + :param axes: a list of new axes for aggregation + :param aggregation_type: the type of fmrc aggregation + :param chunk_index: a dataframe containing the kerchunk index + :param zarr_ref_store: the deflated (chunks removed) zarr store + :return: the inflated zarr store + """ + # Make a deep copy to avoid modifying the input + zstore = copy.deepcopy(zarr_ref_store["refs"]) + + axes_by_name: dict[str, pd.Index] = {pdi.name: pdi for pdi in axes} + # Validate axis names + time_dims: dict[str, int] = {} + time_coords: dict[str, tuple[str, ...]] = {} + # TODO: add a data class or other method of typing and validating the variables created in this if block + if aggregation_type.value == AggregationType.HORIZON.value: + # Use index length horizons containing timedelta ranges for the set of steps + time_dims["step"] = len(axes_by_name["step"]) + time_dims["valid_time"] = len(axes_by_name["valid_time"]) + + time_coords["step"] = ("step", "valid_time") + time_coords["valid_time"] = ("step", "valid_time") + time_coords["time"] = ("step", "valid_time") + time_coords["datavar"] = ("step", "valid_time") + + steps = create_steps(axes_by_name["step"], time_dims["valid_time"]) + valid_times = np.tile( + axes_by_name["valid_time"].to_numpy(), (time_dims["step"], 1) + ) + times = valid_times - steps + + elif aggregation_type.value == AggregationType.VALID_TIME.value: + # Provide an index of steps and an index of valid times + time_dims["step"] = len(axes_by_name["step"]) + time_dims["valid_time"] = len(axes_by_name["valid_time"]) + + time_coords["step"] = ("step",) + time_coords["valid_time"] = ("valid_time",) + time_coords["time"] = ("valid_time", "step") + time_coords["datavar"] = ("valid_time", "step") + + steps = axes_by_name["step"].to_numpy() + valid_times = axes_by_name["valid_time"].to_numpy() + + steps2d = np.tile(axes_by_name["step"], (time_dims["valid_time"], 1)) + valid_times2d = np.tile( + np.reshape(axes_by_name["valid_time"], (-1, 1)), (1, time_dims["step"]) + ) + times = valid_times2d - steps2d + + elif aggregation_type.value == AggregationType.RUN_TIME.value: + # Provide an index of steps and an index of run times. + time_dims["step"] = len(axes_by_name["step"]) + time_dims["time"] = len(axes_by_name["time"]) + + time_coords["step"] = ("step",) + time_coords["valid_time"] = ("time", "step") + time_coords["time"] = ("time",) + time_coords["datavar"] = ("time", "step") + + steps = axes_by_name["step"].to_numpy() + times = axes_by_name["time"].to_numpy() + + # The valid times will be runtimes by steps + steps2d = np.tile(axes_by_name["step"], (time_dims["time"], 1)) + times2d = np.tile( + np.reshape(axes_by_name["time"], (-1, 1)), (1, time_dims["step"]) + ) + valid_times = times2d + steps2d + + elif aggregation_type.value == AggregationType.BEST_AVAILABLE.value: + time_dims["valid_time"] = len(axes_by_name["valid_time"]) + assert ( + len(axes_by_name["time"]) == 1 + ), "The time axes must describe a single 'as of' date for best available" + reference_time = axes_by_name["time"].to_numpy()[0] + + time_coords["step"] = ("valid_time",) + time_coords["valid_time"] = ("valid_time",) + time_coords["time"] = ("valid_time",) + time_coords["datavar"] = ("valid_time",) + + valid_times = axes_by_name["valid_time"].to_numpy() + times = np.where(valid_times <= reference_time, valid_times, reference_time) + steps = valid_times - times + else: + raise RuntimeError(f"Invalid aggregation_type argument: {aggregation_type}") + + # Copy all the groups that contain variables in the chunk dataset + unique_groups = chunk_index.set_index( + ["varname", "stepType", "typeOfLevel"] + ).index.unique() + + # Drop keys not in the unique groups + for key in list(zstore.keys()): + # Separate the key as a path keeping only: varname, stepType and typeOfLevel + # Treat root keys like ".zgroup" as special and return an empty tuple + lookup = tuple( + [val for val in os.path.dirname(key).split("/")[:3] if val != ""] + ) + if lookup not in unique_groups: + del zstore[key] + + # Now update the zstore for each variable. + for key, group in chunk_index.groupby(["varname", "stepType", "typeOfLevel"]): + base_path = "/".join(key) + lvals = group.level.unique() + dims = time_dims.copy() + coords = time_coords.copy() + if len(lvals) == 1: + lvals = lvals.squeeze() + dims[key[2]] = 0 + elif len(lvals) > 1: + lvals = np.sort(lvals) + # multipel levels + dims[key[2]] = len(lvals) + coords["datavar"] += (key[2],) + else: + raise ValueError("") + + # Convert to floating point seconds + # td.astype("timedelta64[s]").astype(float) / 3600 # Convert to floating point hours + store_coord_var( + key=f"{base_path}/time", + zstore=zstore, + coords=time_coords["time"], + data=times.astype("datetime64[s]"), + ) + + store_coord_var( + key=f"{base_path}/valid_time", + zstore=zstore, + coords=time_coords["valid_time"], + data=valid_times.astype("datetime64[s]"), + ) + + store_coord_var( + key=f"{base_path}/step", + zstore=zstore, + coords=time_coords["step"], + data=steps.astype("timedelta64[s]").astype("float64") / 3600.0, + ) + + store_coord_var( + key=f"{base_path}/{key[2]}", + zstore=zstore, + coords=(key[2],) if lvals.shape else (), + data=lvals, # all grib levels are floats + ) + + store_data_var( + key=f"{base_path}/{key[0]}", + zstore=zstore, + dims=dims, + coords=coords, + data=group, + steps=steps, + times=times, + lvals=lvals if lvals.shape else None, + ) + + return dict(refs=zstore, version=1) + + +def strip_datavar_chunks( + kerchunk_store: dict, keep_list: tuple[str, ...] = ("latitude", "longitude") +) -> None: + """ + Modify in place a kerchunk reference store to strip the kerchunk references for variables not in the + keep list. + :param kerchunk_store: a kerchunk ref spec store + :param keep_list: the list of variables to keep references + """ + zarr_store = kerchunk_store["refs"] + + zchunk_matcher = re.compile(r"^(?P.*)\/(?P\d+[\.\d+]*)$") + for key in list(zarr_store.keys()): + matched = zchunk_matcher.match(key) + if matched: + logger.debug("Matched! %s", matched) + if any([matched.group("name").endswith(keeper) for keeper in keep_list]): + logger.debug("Skipping key %s", matched.group("name")) + continue + del zarr_store[key] + + +def write_store(metadata_path: str, store: dict): + fpath = os.path.join(metadata_path, ZARR_TREE_STORE) + compressed = gzip.compress(ujson.dumps(store).encode()) + with fsspec.open(fpath, "wb") as f: + f.write(compressed) + logger.info("Wrote %d bytes to %s", len(compressed), fpath) + + +@cache +def read_store(metadata_path: str) -> dict: + """ + Cached method for loading the static zarr store from a metadata path + :param metadata_path: the path (usually gcs) to the metadata directory + :return: a kerchunk zarr store reference spec dictionary (defalated) + """ + fpath = os.path.join(metadata_path, ZARR_TREE_STORE) + with fsspec.open(fpath, "rb") as f: + compressed = f.read() + logger.info("Read %d bytes from %s", len(compressed), fpath) + zarr_store = ujson.loads(gzip.decompress(compressed).decode()) + return zarr_store + + +def grib_coord(name: str) -> str: + """ + Take advantage of gribs strict coordinate name structure + Use the known level names and assume everything else is "level" which is described by the + gribLevelAttribute + This is helpful because there are a lot of named levels but they are all float values + and each variable can have only one level coordinate. + By mapping all of these levels to level the sparse chunk index becomes dense. + :param name: + :return: + """ + + if name in ("valid_time", "time", "step", "latitude", "longitude"): + return name + else: + return "level" + def parse_grib_idx( basename: str, @@ -152,7 +551,7 @@ def build_path(path: Iterable[str | None], suffix: Optional[str] = None) -> str: def extract_dataset_chunk_index( - dset: "datatree.DataTree", + dset: "xr.DataTree", ref_store: Dict, grib: bool = False, ) -> list[dict]: @@ -176,13 +575,12 @@ def extract_dataset_chunk_index( ------- list[dict] : returns the extracted grib metadata in the form of key-value pairs inside a list """ - import datatree result: list[dict] = [] attributes = dset.attrs.copy() dpath = None - if isinstance(dset, datatree.DataTree): + if isinstance(dset, xr.DataTree): dpath = dset.path walk_group = dset.parent while walk_group: @@ -257,7 +655,7 @@ def extract_dataset_chunk_index( def extract_datatree_chunk_index( - dtree: "datatree.DataTree", kerchunk_store: dict, grib: bool = False + dtree: "xr.DataTree", kerchunk_store: dict, grib: bool = False ) -> "pd.DataFrame": """ Recursive method to iterate over the data tree and extract the data variable chunks with index metadata @@ -333,7 +731,6 @@ def _map_grib_file_by_group( def _extract_single_group(grib_group: dict, idx: int, storage_options: Dict): - import datatree from kerchunk.grib2 import grib_tree grib_tree_store = grib_tree( @@ -347,7 +744,7 @@ def _extract_single_group(grib_group: dict, idx: int, storage_options: Dict): logger.info("Empty DT: %s", grib_tree_store) return None - dt = datatree.open_datatree( + dt = xr.open_datatree( fsspec.filesystem("reference", fo=grib_tree_store).get_mapper(""), engine="zarr", consolidated=False, diff --git a/kerchunk/grib2.py b/kerchunk/grib2.py index f105fe8b..37dcb3ff 100644 --- a/kerchunk/grib2.py +++ b/kerchunk/grib2.py @@ -14,7 +14,17 @@ from kerchunk.utils import class_factory, _encode_for_JSON from kerchunk.codecs import GRIBCodec from kerchunk.combine import MultiZarrToZarr, drop -from kerchunk._grib_idx import parse_grib_idx, build_idx_grib_mapping, map_from_index +from kerchunk._grib_idx import ( + parse_grib_idx, + build_idx_grib_mapping, + map_from_index, + strip_datavar_chunks, + reinflate_grib_store, + extract_datatree_chunk_index, + AggregationType, + read_store, + write_store, +) try: @@ -593,4 +603,10 @@ def correct_hrrr_subhf_step(group: Dict) -> Dict: "parse_grib_idx", "build_idx_grib_mapping", "map_from_index", + "strip_datavar_chunks", + "reinflate_grib_store", + "extract_datatree_chunk_index", + "AggregationType", + "read_store", + "write_store", ] diff --git a/kerchunk/tests/test_grib.py b/kerchunk/tests/test_grib.py index 32092ced..1cee1e7f 100644 --- a/kerchunk/tests/test_grib.py +++ b/kerchunk/tests/test_grib.py @@ -6,7 +6,6 @@ import pandas as pd import pytest import xarray as xr -import datatree import zarr import ujson from kerchunk.grib2 import ( @@ -271,7 +270,7 @@ def test_hrrr_sfcf_grib_datatree(): with open(fpath, "rb") as fobj: scanned_msgs = ujson.load(fobj) merged = grib_tree(scanned_msgs) - dt = datatree.open_datatree( + dt = xr.open_datatree( fsspec.filesystem("reference", fo=merged).get_mapper(""), engine="zarr", consolidated=False, @@ -349,7 +348,7 @@ def test_parse_grib_idx_content(idx_url, storage_options): def zarr_tree_and_datatree_instance(): fn = os.path.join(here, "gfs.t00z.pgrb2.0p25.f006.test-limit-100") tree_store = tree_store = grib_tree(scan_grib(fn)) - dt_instance = datatree.open_datatree( + dt_instance = xr.open_datatree( fsspec.filesystem("reference", fo=tree_store).get_mapper(""), engine="zarr", consolidated=False, diff --git a/pyproject.toml b/pyproject.toml index 81df712e..5a79c469 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -44,8 +44,7 @@ dev = [ "pytest", "s3fs", "types-ujson", - "xarray", - "xarray-datatree", + "xarray>=2024.10.0", "cfgrib", "scipy", "netcdf4"