From 043e3f60a84aff2105b59cf31021110b23a512b2 Mon Sep 17 00:00:00 2001 From: peterdudfield Date: Wed, 28 Aug 2024 16:42:59 +0100 Subject: [PATCH 01/13] first try at india satellite --- satip/app.py | 127 +++++++++++++----- satip/download.py | 6 +- .../integration_test/test_integration_app.py | 32 ++++- .../test_integration_download.py | 18 ++- .../test_integration_eumetsat.py | 26 ++++ 5 files changed, 166 insertions(+), 43 deletions(-) diff --git a/satip/app.py b/satip/app.py index fb88ef24..b0a3362f 100644 --- a/satip/app.py +++ b/satip/app.py @@ -1,4 +1,13 @@ -""" Application that pulls data from the EUMETSAT API and saves to a zarr file""" +""" Application that pulls data from the EUMETSAT API and saves to a zarr file + +We now support +- The 0 deg HR-SERVIRI data - https://masif.eumetsat.int/ossi/webpages/level3.html?ossi_level3_filename=seviri_0deg_hr.html&ossi_level2_filename=seviri_0deg.html +- The 9.5 deg RSS data - https://masif.eumetsat.int/ossi/webpages/level2.html?ossi_level2_filename=seviri_rss.html +- The 45.5 deg IODC data - https://masif.eumetsat.int/ossi/webpages/level2.html?ossi_level2_filename=seviri_iodc.html + +By-default we pull the RSS data, if not available we try the HR-SERVIRI. +We have an option to just use the IODC data. +""" import glob import os import random @@ -12,6 +21,7 @@ import satip from satip import utils from satip.eumetsat import EUMETSATDownloadManager +from satip.download import RSS_ID, SEVIRI_ID, SEVIRI_IODC_ID log = structlog.stdlib.get_logger() @@ -81,8 +91,8 @@ type=click.BOOL, ) @click.option( - "--use-backup", - envvar="USE_BACKUP", + "--use-hr-serviri", + envvar="USE_HR_SERVIRI", default=False, help="Option not to use the RSS imaginary. If True, use the 15 mins data. ", type=click.BOOL, @@ -94,6 +104,13 @@ help="Set the maximum number of dataset to load, default gets them all", type=click.INT, ) +@click.option( + "--use-iodc", + envvar="USE_IODC", + default=False, + help="An option to use the IODC data instead of the RSS data.", + type=click.BOOL, +) def run( api_key, api_secret, @@ -104,8 +121,9 @@ def run( use_rescaler: bool = False, start_time: str = pd.Timestamp.utcnow().isoformat(timespec="minutes").split("+")[0], cleanup: bool = False, - use_backup: bool = False, + use_hr_serviri: bool = False, maximum_n_datasets: int = -1, + use_iodc: bool = False, ): """Run main application @@ -119,8 +137,9 @@ def run( use_rescaler: Rescale data to between 0 and 1 or not start_time: Start time in UTC ISO Format cleanup: Cleanup Data Tailor - use_backup: use 15 min data, not RSS + use_hr_serviri: use 15 min data, not RSS maximum_n_datasets: Set the maximum number of dataset to load, default gets them all + use_iodc: Use IODC data instead """ utils.setupLogging() @@ -141,37 +160,60 @@ def run( ) # 1. Get data from API, download native files with tempfile.TemporaryDirectory() as tmpdir: - download_manager = EUMETSATDownloadManager( - user_key=api_key, - user_secret=api_secret, - data_dir=tmpdir, - native_file_dir=save_dir_native, - ) - if cleanup: - log.debug("Running Data Tailor Cleanup", memory=utils.get_memory()) - download_manager.cleanup_datatailor() - return + start_date = pd.Timestamp(start_time, tz="UTC") - pd.Timedelta(history) log.info( f"Fetching datasets for {start_date} - {start_time}", memory=utils.get_memory() ) - datasets = download_manager.identify_available_datasets( - start_date=start_date.strftime("%Y-%m-%d-%H:%M:%S"), - end_date=pd.Timestamp(start_time, tz="UTC").strftime("%Y-%m-%d-%H:%M:%S"), - ) - # Check if any RSS imagery is available, if not, fall back to 15 minutely data - if (len(datasets) == 0) or use_backup: - log.warn( - f"No RSS Imagery available or using backup ({use_backup=}), " - f"falling back to 15-minutely data", + + if not use_iodc: + # if not iodc, try rss, then get hr_serviri data if not rss + download_manager = EUMETSATDownloadManager( + user_key=api_key, + user_secret=api_secret, + data_dir=tmpdir, + native_file_dir=save_dir_native, + ) + if cleanup: + log.debug("Running Data Tailor Cleanup", memory=utils.get_memory()) + download_manager.cleanup_datatailor() + return + + datasets = download_manager.identify_available_datasets( + start_date=start_date.strftime("%Y-%m-%d-%H:%M:%S"), + end_date=pd.Timestamp(start_time, tz="UTC").strftime("%Y-%m-%d-%H:%M:%S"), + ) + # Check if any RSS imagery is available, if not, fall back to 15 minutely data + if (len(datasets) == 0) or use_hr_serviri: + log.warn( + f"No RSS Imagery available or using backup ({use_hr_serviri=}), " + f"falling back to 15-minutely data", + memory=utils.get_memory(), + ) + datasets = download_manager.identify_available_datasets( + start_date=start_date.strftime("%Y-%m-%d-%H:%M:%S"), + end_date=pd.Timestamp(start_time, tz="UTC").strftime("%Y-%m-%d-%H:%M:%S"), + product_id=SEVIRI_ID, + ) + use_hr_serviri = True + else: + # get the IODC data + log.info( + f"Fetching IODC datasets for {start_date} - {start_time}", memory=utils.get_memory(), ) + download_manager = EUMETSATDownloadManager( + user_key=api_key, + user_secret=api_secret, + data_dir=tmpdir, + native_file_dir=save_dir_native, + ) datasets = download_manager.identify_available_datasets( start_date=start_date.strftime("%Y-%m-%d-%H:%M:%S"), end_date=pd.Timestamp(start_time, tz="UTC").strftime("%Y-%m-%d-%H:%M:%S"), - product_id="EO:EUM:DAT:MSG:HRSEVIRI", + product_id=SEVIRI_IODC_ID, ) - use_backup = True + # Filter out ones that already exist # if both final files don't exist, then we should make sure we run the whole process datasets = utils.filter_dataset_ids_on_current_files(datasets, save_dir) @@ -191,15 +233,26 @@ def run( datasets = datasets[0:maximum_n_datasets] random.shuffle(datasets) # Shuffle so subsequent runs might download different data updated_data = True - if use_backup: + if use_hr_serviri: # Check before downloading each tailored dataset, as it can take awhile for dset in datasets: dset = utils.filter_dataset_ids_on_current_files([dset], save_dir) if len(dset) > 0: download_manager.download_tailored_datasets( dset, - product_id="EO:EUM:DAT:MSG:HRSEVIRI", + product_id=SEVIRI_ID, + ) + elif use_iodc: + # Check before downloading each dataset, as it can take a while + for dset in datasets: + dset = utils.filter_dataset_ids_on_current_files([dset], save_dir) + if len(dset) > 0: + # not we might have to change this to the data taylor + download_manager.download_datasets( + dset, + product_id=SEVIRI_IODC_ID, ) + else: # Check before downloading each tailored dataset, as it can take awhile for dset in datasets: @@ -207,15 +260,15 @@ def run( if len(dset) > 0: download_manager.download_datasets( dset, - product_id="EO:EUM:DAT:MSG:MSG15-RSS", + product_id=RSS_ID, ) # 2. Load nat files to one Xarray Dataset - native_files = ( - list(glob.glob(os.path.join(tmpdir, "*.nat"))) - if not use_backup - else list(glob.glob(os.path.join(tmpdir, "*HRSEVIRI*"))) - ) + if use_hr_serviri or use_iodc: + native_files = list(glob.glob(os.path.join(tmpdir, "*HRSEVIRI*"))) + else: + native_files = list(glob.glob(os.path.join(tmpdir, "*.nat"))) + log.debug( "Saving native files to Zarr: " + native_files.__str__(), memory=utils.get_memory(), @@ -225,19 +278,19 @@ def run( native_files, save_dir=save_dir, use_rescaler=use_rescaler, - using_backup=use_backup, + using_backup=use_hr_serviri, ) # Move around files into and out of latest utils.move_older_files_to_different_location( save_dir=save_dir, history_time=(start_date - pd.Timedelta("30 min")) ) - if not utils.check_both_final_files_exists(save_dir=save_dir, using_backup=use_backup): + if not utils.check_both_final_files_exists(save_dir=save_dir, using_backup=use_hr_serviri or use_iodc): updated_data = True if updated_data: # Collate files into single NetCDF file - utils.collate_files_into_latest(save_dir=save_dir, using_backup=use_backup) + utils.collate_files_into_latest(save_dir=save_dir, using_backup=use_hr_serviri or use_iodc) log.debug("Collated files", memory=utils.get_memory()) log.info("Finished Running application", memory=utils.get_memory()) diff --git a/satip/download.py b/satip/download.py index 412e05d3..7792c7c8 100644 --- a/satip/download.py +++ b/satip/download.py @@ -53,7 +53,7 @@ RSS_ID = "EO:EUM:DAT:MSG:MSG15-RSS" CLOUD_ID = "EO:EUM:DAT:MSG:RSS-CLM" SEVIRI_ID = "EO:EUM:DAT:MSG:HRSEVIRI" - +SEVIRI_IODC_ID = "EO:EUM:DAT:MSG:HRSEVIRI-IODC" def download_eumetsat_data( download_directory, @@ -117,8 +117,10 @@ def download_eumetsat_data( products_to_use.append(RSS_ID) if "cloud" in product: products_to_use.append(CLOUD_ID) - if "seviri" in product: + if ("seviri" in product) and not ('iodc' in product): products_to_use.append(SEVIRI_ID) + if "seviri_iodc" in product: + products_to_use.append(SEVIRI_IODC_ID) for product_id in products_to_use: # Do this to clear out any partially downloaded days diff --git a/tests/integration_test/test_integration_app.py b/tests/integration_test/test_integration_app.py index db493a2e..65e2ea3d 100644 --- a/tests/integration_test/test_integration_app.py +++ b/tests/integration_test/test_integration_app.py @@ -43,6 +43,36 @@ def test_save_to_netcdf(): # noqa 103 assert len(native_files) > 0 +@freeze_time("2024-06-28 12:00:00") # Date with IODC +def test_save_to_netcdf(): # noqa 103 + user_key = os.environ.get("EUMETSAT_USER_KEY") + user_secret = os.environ.get("EUMETSAT_USER_SECRET") + with tempfile.TemporaryDirectory() as tmpdirname: + response = runner.invoke( + run, + [ + "--api-key", + user_key, + "--api-secret", + user_secret, + "--save-dir", + tmpdirname, + "--use-rescaler", + False, + "--start-time", + datetime.datetime.utcnow().isoformat(), + "--maximum-n-datasets", + 1, + "--use-iodc", + 1 + ], + catch_exceptions=False, + ) + assert response.exit_code == 0, response.exception + native_files = list(glob.glob(os.path.join(tmpdirname, "*.zarr.zip"))) + assert len(native_files) > 0 + + def test_save_to_netcdf_now(): # noqa 103 user_key = os.environ.get("EUMETSAT_USER_KEY") user_secret = os.environ.get("EUMETSAT_USER_SECRET") @@ -169,7 +199,7 @@ def test_use_backup(): # noqa 103 False, "--start-time", datetime.datetime.utcnow().isoformat(), - "--use-backup", + "--use-hr-serviri", True, "--maximum-n-datasets", 1, diff --git a/tests/integration_test/test_integration_download.py b/tests/integration_test/test_integration_download.py index 5b9b7884..0db19b12 100644 --- a/tests/integration_test/test_integration_download.py +++ b/tests/integration_test/test_integration_download.py @@ -5,14 +5,12 @@ import pandas as pd from satip.download import ( - _determine_datetimes_to_download_files, - _get_missing_datetimes_from_list_of_files, download_eumetsat_data, ) from satip.utils import format_dt_str -class TestDownload(): +class TestDownload: """Test case for downloader tests.""" def test_download_eumetsat_data(self): # noqa @@ -28,3 +26,17 @@ def test_download_eumetsat_data(self): # noqa product=["cloud", "rss"], enforce_full_days=False, ) is None + + def test_download_eumetsat_iodc_data(self): # noqa + # Call the downloader on a very short chunk of data for IODC: + assert download_eumetsat_data( + download_directory=str(os.getcwd() + "/storage/"), + start_date=format_dt_str("2024-06-01 11:59:00"), + end_date=format_dt_str("2024-06-01 12:02:00"), + user_key=os.environ.get("EUMETSAT_USER_KEY"), + user_secret=os.environ.get("EUMETSAT_USER_SECRET"), + auth_filename=None, + number_of_processes=2, + product=["seviri_iodc"], + enforce_full_days=False, + ) is None diff --git a/tests/integration_test/test_integration_eumetsat.py b/tests/integration_test/test_integration_eumetsat.py index 894b74f8..0b55873d 100644 --- a/tests/integration_test/test_integration_eumetsat.py +++ b/tests/integration_test/test_integration_eumetsat.py @@ -85,3 +85,29 @@ def test_data_tailor(): native_files = list(glob.glob(os.path.join(tmpdirname, "*HRSEVIRI_HRV"))) assert len(native_files) > 0 + + +def test_data_download_iodc(): + """If there were a test here, there would also be a docstring here.""" + + user_key = os.environ.get("EUMETSAT_USER_KEY") + user_secret = os.environ.get("EUMETSAT_USER_SECRET") + + start_date = datetime.now(tz=timezone.utc) - timedelta(hours=2) + end_date = datetime.now(tz=timezone.utc) + + with tempfile.TemporaryDirectory() as tmpdirname: + download_manager = EUMETSATDownloadManager( + user_key=user_key, + user_secret=user_secret, + data_dir=tmpdirname, + native_file_dir=tmpdirname, + ) + + datasets = download_manager.identify_available_datasets( + start_date=start_date.strftime("%Y-%m-%d-%H:%M:%S"), + end_date=end_date.strftime("%Y-%m-%d-%H:%M:%S"), + product_id="EO:EUM:DAT:MSG:HRSEVIRI-IODC", + ) + + assert len(datasets) > 0 From 4b2166354a4694c14b5a40675dba0e2a0649f7a8 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Wed, 28 Aug 2024 15:46:48 +0000 Subject: [PATCH 02/13] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- satip/app.py | 2 +- satip/download.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/satip/app.py b/satip/app.py index b0a3362f..62a905bd 100644 --- a/satip/app.py +++ b/satip/app.py @@ -20,8 +20,8 @@ import satip from satip import utils -from satip.eumetsat import EUMETSATDownloadManager from satip.download import RSS_ID, SEVIRI_ID, SEVIRI_IODC_ID +from satip.eumetsat import EUMETSATDownloadManager log = structlog.stdlib.get_logger() diff --git a/satip/download.py b/satip/download.py index 7792c7c8..458188ca 100644 --- a/satip/download.py +++ b/satip/download.py @@ -117,7 +117,7 @@ def download_eumetsat_data( products_to_use.append(RSS_ID) if "cloud" in product: products_to_use.append(CLOUD_ID) - if ("seviri" in product) and not ('iodc' in product): + if ("seviri" in product) and 'iodc' not in product: products_to_use.append(SEVIRI_ID) if "seviri_iodc" in product: products_to_use.append(SEVIRI_IODC_ID) From 3d4b19aefdfcbf8c4ddb1f3ae6ddda3118f983ac Mon Sep 17 00:00:00 2001 From: peterdudfield Date: Wed, 28 Aug 2024 18:44:52 +0100 Subject: [PATCH 03/13] got test working on app --- satip/app.py | 50 ++++- satip/geospatial.py | 2 +- satip/utils.py | 137 +++++++++----- .../integration_test/test_integration_app.py | 176 ++++++------------ .../test_integration_eumetsat.py | 3 +- 5 files changed, 195 insertions(+), 173 deletions(-) diff --git a/satip/app.py b/satip/app.py index 62a905bd..4388d750 100644 --- a/satip/app.py +++ b/satip/app.py @@ -111,7 +111,7 @@ help="An option to use the IODC data instead of the RSS data.", type=click.BOOL, ) -def run( +def run_click( api_key, api_secret, save_dir, @@ -124,6 +124,39 @@ def run( use_hr_serviri: bool = False, maximum_n_datasets: int = -1, use_iodc: bool = False, +): + """ See below for function description. + + There is slight duplicate, but testing adn debugging is easier with this setup. + """ + run( + api_key, + api_secret, + save_dir, + save_dir_native, + history, + use_rescaler=use_rescaler, + start_time=start_time, + cleanup=cleanup, + use_hr_serviri=use_hr_serviri, + maximum_n_datasets=maximum_n_datasets, + use_iodc=use_iodc + ) + + + +def run( + api_key, + api_secret, + save_dir = './', + save_dir_native = "./raw", + history="60 minutes", + use_rescaler: bool = False, + start_time: str = pd.Timestamp.utcnow().isoformat(timespec="minutes").split("+")[0], + cleanup: bool = False, + use_hr_serviri: bool = False, + maximum_n_datasets: int = -1, + use_iodc: bool = False, ): """Run main application @@ -133,7 +166,6 @@ def run( save_dir: Save directory save_dir_native: where the native files are saved history: History time - db_url: URL of database use_rescaler: Rescale data to between 0 and 1 or not start_time: Start time in UTC ISO Format cleanup: Cleanup Data Tailor @@ -247,7 +279,7 @@ def run( for dset in datasets: dset = utils.filter_dataset_ids_on_current_files([dset], save_dir) if len(dset) > 0: - # not we might have to change this to the data taylor + # note we might have to change this to the data taylor download_manager.download_datasets( dset, product_id=SEVIRI_IODC_ID, @@ -264,9 +296,10 @@ def run( ) # 2. Load nat files to one Xarray Dataset - if use_hr_serviri or use_iodc: + if use_hr_serviri: native_files = list(glob.glob(os.path.join(tmpdir, "*HRSEVIRI*"))) else: + # RSS or IODC native_files = list(glob.glob(os.path.join(tmpdir, "*.nat"))) log.debug( @@ -278,19 +311,20 @@ def run( native_files, save_dir=save_dir, use_rescaler=use_rescaler, - using_backup=use_hr_serviri, + use_hr_serviri=use_hr_serviri, + use_iodc=use_iodc, ) # Move around files into and out of latest utils.move_older_files_to_different_location( save_dir=save_dir, history_time=(start_date - pd.Timedelta("30 min")) ) - if not utils.check_both_final_files_exists(save_dir=save_dir, using_backup=use_hr_serviri or use_iodc): + if not utils.check_both_final_files_exists(save_dir=save_dir, use_hr_serviri=use_hr_serviri, use_iodc=use_iodc): updated_data = True if updated_data: # Collate files into single NetCDF file - utils.collate_files_into_latest(save_dir=save_dir, using_backup=use_hr_serviri or use_iodc) + utils.collate_files_into_latest(save_dir=save_dir, use_hr_serviri=use_hr_serviri, use_iodc=use_iodc) log.debug("Collated files", memory=utils.get_memory()) log.info("Finished Running application", memory=utils.get_memory()) @@ -301,4 +335,4 @@ def run( if __name__ == "__main__": - run() + run_click() diff --git a/satip/geospatial.py b/satip/geospatial.py index b0e9aeb0..0015046e 100644 --- a/satip/geospatial.py +++ b/satip/geospatial.py @@ -30,7 +30,7 @@ # Geographic bounds for various regions of interest, in order of min_lon, min_lat, max_lon, max_lat # (see https://satpy.readthedocs.io/en/stable/_modules/satpy/scene.html) -GEOGRAPHIC_BOUNDS = {"UK": (-16, 45, 10, 62), "RSS": (-64, 16, 83, 69)} +GEOGRAPHIC_BOUNDS = {"UK": (-16, 45, 10, 62), "RSS": (-64, 16, 83, 69), "India": (68, 6, 97, 37)} class Transformers: diff --git a/satip/utils.py b/satip/utils.py index fd89463f..f6e6c119 100644 --- a/satip/utils.py +++ b/satip/utils.py @@ -293,10 +293,11 @@ def convert_scene_to_dataarray( dataarray.x_osgb.attrs["name"] = "Easting" dataarray.y_osgb.attrs["name"] = "Northing" + log.info("Calculated OSGB", memory=get_memory()) for name in ["x", "y"]: dataarray[name].attrs["coordinate_reference_system"] = "geostationary" - log.info("Calculated OSGB", memory=get_memory()) + # Round to the nearest 5 minutes dataarray.attrs.update(data_attrs) dataarray.attrs["end_time"] = pd.Timestamp(dataarray.attrs["end_time"]).round("5 min") @@ -341,7 +342,12 @@ def do_v15_rescaling( return dataarray -def get_dataset_from_scene(filename: str, hrv_scaler, use_rescaler: bool, save_dir, using_backup): +def get_dataset_from_scene(filename: str, + hrv_scaler, + use_rescaler: bool, + save_dir, + use_hr_serviri, + use_iodc=False): """ Returns the Xarray dataset from the filename """ @@ -358,22 +364,29 @@ def get_dataset_from_scene(filename: str, hrv_scaler, use_rescaler: bool, save_d generate=False, ) - log.debug("Loaded HRV", memory=get_memory()) - hrv_dataarray: xr.DataArray = convert_scene_to_dataarray( - hrv_scene, band="HRV", area="UK", calculate_osgb=True - ) + if not use_iodc: + log.debug("Loaded HRV", memory=get_memory()) + hrv_dataarray: xr.DataArray = convert_scene_to_dataarray( + hrv_scene, band="HRV", area="UK", calculate_osgb=True + ) + else: + hrv_dataarray: xr.DataArray = convert_scene_to_dataarray( + hrv_scene, band="HRV", area="India", calculate_osgb=False + ) log.debug("Converted HRV to dataarray", memory=get_memory()) del hrv_scene attrs = serialize_attrs(hrv_dataarray.attrs) - if use_rescaler: - hrv_dataarray = hrv_scaler.rescale(hrv_dataarray) - else: - hrv_dataarray = do_v15_rescaling( - hrv_dataarray, - variable_order=["HRV"], - maxs=HRV_SCALER_MAX, - mins=HRV_SCALER_MIN, - ) + + if not use_iodc: + if use_rescaler: + hrv_dataarray = hrv_scaler.rescale(hrv_dataarray) + else: + hrv_dataarray = do_v15_rescaling( + hrv_dataarray, + variable_order=["HRV"], + maxs=HRV_SCALER_MAX, + mins=HRV_SCALER_MIN, + ) hrv_dataarray = hrv_dataarray.transpose( "time", "y_geostationary", "x_geostationary", "variable" ) @@ -390,7 +403,13 @@ def get_dataset_from_scene(filename: str, hrv_scaler, use_rescaler: bool, save_d gc.collect() return - save_file = os.path.join(save_dir, f"{'15_' if using_backup else ''}hrv_{now_time}.zarr.zip") + filename = f"hrv_{now_time}.zarr.zip" + if use_hr_serviri: + filename = f"15_{filename}" + if use_iodc: + filename = f"iodc_{now_time}.zarr.zip" + + save_file = os.path.join(save_dir, filename) log.debug(f"Saving HRV netcdf in {save_file}", memory=get_memory()) save_to_zarr_to_backend(hrv_dataset, save_file) del hrv_dataset @@ -499,7 +518,8 @@ def save_native_to_zarr( bands: list = ALL_BANDS, save_dir: str = "./", use_rescaler: bool = False, - using_backup: bool = False, + use_hr_serviri: bool = False, + use_iodc: bool = False, ) -> None: """ Saves native files to NetCDF for consumer @@ -509,11 +529,12 @@ def save_native_to_zarr( bands: Bands to save save_dir: Directory to save the netcdf files use_rescaler: Whether to rescale between 0 and 1 or not - using_backup: Whether the input data is the backup 15 minutely data or not + use_hr_serviri: Whether the input data is the backup 15 minutely data or not + use_iodc: Whether to use the IODC data or not """ log.debug( - f"Converting from {'HRIT' if using_backup else 'native'} to zarr in {save_dir}", + f"Converting from {'HRIT' if use_hr_serviri else 'native'} to zarr in {save_dir}", memory=get_memory(), ) @@ -527,21 +548,23 @@ def save_native_to_zarr( ) for f in list_of_native_files: log.debug(f"Processing {f}", memory=get_memory()) - if "EPCT" in f: + if use_iodc: + get_dataset_from_scene(f, hrv_scaler, False, save_dir, False, use_iodc=use_iodc) + elif "EPCT" in f: log.debug(f"Processing HRIT file {f}", memory=get_memory()) if "HRV" in f: log.debug(f"Processing HRV {f}", memory=get_memory()) - get_dataset_from_scene(f, hrv_scaler, use_rescaler, save_dir, using_backup) + get_dataset_from_scene(f, hrv_scaler, use_rescaler, save_dir, use_hr_serviri) else: log.debug(f"Processing non-HRV {f}", memory=get_memory()) - get_nonhrv_dataset_from_scene(f, scaler, use_rescaler, save_dir, using_backup) + get_nonhrv_dataset_from_scene(f, scaler, use_rescaler, save_dir, use_hr_serviri) else: if "HRV" in bands: log.debug(f"Processing HRV {f}", memory=get_memory()) - get_dataset_from_scene(f, hrv_scaler, use_rescaler, save_dir, using_backup) + get_dataset_from_scene(f, hrv_scaler, use_rescaler, save_dir, use_hr_serviri) log.debug(f"Processing non-HRV {f}", memory=get_memory()) - get_nonhrv_dataset_from_scene(f, scaler, use_rescaler, save_dir, using_backup) + get_nonhrv_dataset_from_scene(f, scaler, use_rescaler, save_dir, use_hr_serviri) log.debug(f"Finished processing files: {list_of_native_files}", memory=get_memory()) @@ -765,7 +788,7 @@ def filter_dataset_ids_on_current_files(datasets: list, save_dir: str) -> list: continue finished_datetimes.append( pd.to_datetime( - date.replace("15_", "").split(".zarr.zip")[0].split("/")[-1], + date.replace("iodc_", "").replace("15_", "").split(".zarr.zip")[0].split("/")[-1], format="%Y%m%d%H%M", errors="ignore", ) @@ -838,14 +861,14 @@ def move_older_files_to_different_location(save_dir: str, history_time: pd.Times continue if "hrv" in date: file_time = pd.to_datetime( - date.replace("15_", "").split(".zarr.zip")[0].split("/")[-1].split("_")[-1], + date.replace("iodc_", "").replace("15_", "").split(".zarr.zip")[0].split("/")[-1].split("_")[-1], format="%Y%m%d%H%M", errors="ignore", utc=True, ) else: file_time = pd.to_datetime( - date.replace("15_", "").split(".zarr.zip")[0].split("/")[-1], + date.replace("iodc_", "").replace("15_", "").split(".zarr.zip")[0].split("/")[-1], format="%Y%m%d%H%M", errors="ignore", utc=True, @@ -879,7 +902,7 @@ def move_older_files_to_different_location(save_dir: str, history_time: pd.Times ) else: file_time = pd.to_datetime( - date.replace("15_", "").split(".zarr.zip")[0].split("/")[-1], + date.replace("iodc_", "").replace("15_", "").split(".zarr.zip")[0].split("/")[-1], format="%Y%m%d%H%M", errors="ignore", utc=True, @@ -890,11 +913,20 @@ def move_older_files_to_different_location(save_dir: str, history_time: pd.Times filesystem.move(date, f"{save_dir}/{date.split('/')[-1]}") -def check_both_final_files_exists(save_dir: str, using_backup: bool = False): +def check_both_final_files_exists(save_dir: str, use_hr_serviri: bool = False, use_iodc: bool = False): """Check that both final files exists""" latest_dir = get_latest_subdir_path(save_dir) - hrv_filename = f"{latest_dir}/hrv_latest{'_15' if using_backup else ''}.zarr.zip" - filename = f"{latest_dir}/latest{'_15' if using_backup else ''}.zarr.zip" + hrv_filename = f"{latest_dir}/hrv_latest{'_15' if use_hr_serviri else ''}.zarr.zip" + filename = f"{latest_dir}/latest{'_15' if use_hr_serviri else ''}.zarr.zip" + + if use_iodc: + filename = f"{latest_dir}/iodc_latest.zarr.zip" + if fsspec.open(filename).fs.exists(filename): + log.debug(f"{filename} exists") + return True + else: + log.debug(f"{filename} doesnt exists") + return False log.debug(f"Checking {hrv_filename} and or {filename} exists") @@ -931,31 +963,41 @@ def add_backend_to_filenames(files, backend): raise ValueError(f"Unsupported backend: {backend}") -def collate_files_into_latest(save_dir: str, using_backup: bool = False, backend: str = "s3"): +def collate_files_into_latest(save_dir: str, use_hr_serviri: bool = False, use_iodc: bool=False): """ Convert individual files into single latest file for HRV and non-HRV Args: save_dir: Directory where data is being saved - using_backup: Whether the input data is made up of the 15 minutely backup data or not - backend: Backend type, e.g., "s3", "gs", "az", or "local" + use_hr_serviri: Whether the input data is made up of the 15 minutely backup data or not """ filesystem = fsspec.open(save_dir).fs latest_dir = get_latest_subdir_path(save_dir) - hrv_files = list( - filesystem.glob(f"{latest_dir}/{'15_' if using_backup else ''}hrv_2*.zarr.zip") - ) - if not hrv_files: # Empty set of files, don't do anything + + files = list(filesystem.glob(f"{latest_dir}/hrv_2*.zarr.zip")) + if use_hr_serviri: + files = list(filesystem.glob(f"{latest_dir}/15_hrv_2*.zarr.zip")) + if use_iodc: + log.debug("Collating IODC files") + files = list(filesystem.glob(f"{latest_dir}/iodc_2*.zarr.zip")) + + if not files: # Empty set of files, don't do anything return # Add prefix to beginning of each URL - filename = f"{latest_dir}/hrv_latest{'_15' if using_backup else ''}.zarr.zip" + filename = f"{latest_dir}/hrv_latest.zarr.zip" + if use_hr_serviri: + filename = f"{latest_dir}/15_hrv_latest.zarr.zip" + if use_iodc: + filename = f"{latest_dir}/iodc_latest.zarr.zip" filename_temp = f"{latest_dir}/hrv_tmp_{secrets.token_hex(6)}.zarr.zip" + log.debug(f"Collating HRV files {filename}") - hrv_files = add_backend_to_filenames(hrv_files, backend) # Added backend prefix for hrv files - log.debug(hrv_files) + if "s3" in save_dir: + files = add_backend_to_filenames(files, backend="s3") # Added backend prefix for hrv files + log.debug(files) dataset = ( xr.open_mfdataset( - hrv_files, + files, concat_dim="time", combine="nested", engine="zarr", @@ -982,11 +1024,16 @@ def collate_files_into_latest(save_dir: str, using_backup: bool = False, backend new_times = xr.open_dataset(f"zip::{filename}", engine="zarr").time log.debug(f"{filename} {new_times}") - filename = f"{latest_dir}/latest{'_15' if using_backup else ''}.zarr.zip" + if use_iodc: + return + filename = f"{latest_dir}/latest{'_15' if use_hr_serviri else ''}.zarr.zip" filename_temp = f"{latest_dir}/tmp_{secrets.token_hex(6)}.zarr.zip" log.debug(f"Collating non-HRV files {filename}") - nonhrv_files = list(filesystem.glob(f"{latest_dir}/{'15_' if using_backup else ''}2*.zarr.zip")) - nonhrv_files = add_backend_to_filenames(nonhrv_files, backend) # backend prefix for nonhrv + nonhrv_files = list(filesystem.glob(f"{latest_dir}/{'15_' if use_hr_serviri else ''}2*.zarr.zip")) + + if "s3" in save_dir: + nonhrv_files = add_backend_to_filenames(nonhrv_files, backend="s3") # backend prefix for nonhrv + log.debug(nonhrv_files) o_dataset = ( xr.open_mfdataset( diff --git a/tests/integration_test/test_integration_app.py b/tests/integration_test/test_integration_app.py index 65e2ea3d..ded58f01 100644 --- a/tests/integration_test/test_integration_app.py +++ b/tests/integration_test/test_integration_app.py @@ -20,55 +20,38 @@ def test_save_to_netcdf(): # noqa 103 user_key = os.environ.get("EUMETSAT_USER_KEY") user_secret = os.environ.get("EUMETSAT_USER_SECRET") with tempfile.TemporaryDirectory() as tmpdirname: - response = runner.invoke( - run, - [ - "--api-key", - user_key, - "--api-secret", - user_secret, - "--save-dir", - tmpdirname, - "--use-rescaler", - False, - "--start-time", - datetime.datetime.utcnow().isoformat(), - "--maximum-n-datasets", - 1, - ], - catch_exceptions=False, - ) - assert response.exit_code == 0, response.exception + + run(api_key=user_key, + api_secret=user_secret, + save_dir=tmpdirname, + use_rescaler=False, + start_time=datetime.datetime.utcnow().isoformat(), + maximum_n_datasets=1) + native_files = list(glob.glob(os.path.join(tmpdirname, "*.zarr.zip"))) assert len(native_files) > 0 -@freeze_time("2024-06-28 12:00:00") # Date with IODC -def test_save_to_netcdf(): # noqa 103 +@freeze_time("2024-08-28 12:00:00") # Date with IODC +def test_iodc(): # noqa 103 + + from satip import utils + utils.setupLogging() + user_key = os.environ.get("EUMETSAT_USER_KEY") user_secret = os.environ.get("EUMETSAT_USER_SECRET") with tempfile.TemporaryDirectory() as tmpdirname: - response = runner.invoke( - run, - [ - "--api-key", - user_key, - "--api-secret", - user_secret, - "--save-dir", - tmpdirname, - "--use-rescaler", - False, - "--start-time", - datetime.datetime.utcnow().isoformat(), - "--maximum-n-datasets", - 1, - "--use-iodc", - 1 - ], - catch_exceptions=False, - ) - assert response.exit_code == 0, response.exception + + run(api_key=user_key, + api_secret=user_secret, + save_dir=tmpdirname, + save_dir_native=tmpdirname, + use_rescaler=False, + start_time=datetime.datetime.utcnow().isoformat(), + maximum_n_datasets=1, + history="15 minutes", + use_iodc=True) + native_files = list(glob.glob(os.path.join(tmpdirname, "*.zarr.zip"))) assert len(native_files) > 0 @@ -76,24 +59,15 @@ def test_save_to_netcdf(): # noqa 103 def test_save_to_netcdf_now(): # noqa 103 user_key = os.environ.get("EUMETSAT_USER_KEY") user_secret = os.environ.get("EUMETSAT_USER_SECRET") + with tempfile.TemporaryDirectory() as tmpdirname: - response = runner.invoke( - run, - [ - "--api-key", - user_key, - "--api-secret", - user_secret, - "--save-dir", - tmpdirname, - "--use-rescaler", - False, - "--maximum-n-datasets", - 1, - ], - catch_exceptions=False, - ) - assert response.exit_code == 0, response.exception + + run(api_key=user_key, + api_secret=user_secret, + save_dir=tmpdirname, + use_rescaler=False, + maximum_n_datasets=1) + native_files = list(glob.glob(os.path.join(tmpdirname, "*.zarr.zip"))) assert len(native_files) > 0 @@ -102,25 +76,14 @@ def test_cleanup_now(): # noqa 103 user_key = os.environ.get("EUMETSAT_USER_KEY") user_secret = os.environ.get("EUMETSAT_USER_SECRET") with tempfile.TemporaryDirectory() as tmpdirname: - response = runner.invoke( - run, - [ - "--api-key", - user_key, - "--api-secret", - user_secret, - "--save-dir", - tmpdirname, - "--use-rescaler", - False, - "--cleanup", - True, - "--maximum-n-datasets", - 1, - ], - catch_exceptions=False, - ) - assert response.exit_code == 0, response.exception + + run(api_key=user_key, + api_secret=user_secret, + save_dir=tmpdirname + ,use_rescaler=False, + cleanup=True, + maximum_n_datasets=1) + native_files = list(glob.glob(os.path.join(tmpdirname, "*.zarr.zip"))) assert len(native_files) == 0 @@ -159,25 +122,14 @@ def test_save_to_netcdf_rescaled(): # noqa 103 user_key = os.environ.get("EUMETSAT_USER_KEY") user_secret = os.environ.get("EUMETSAT_USER_SECRET") with tempfile.TemporaryDirectory() as tmpdirname: - response = runner.invoke( - run, - [ - "--api-key", - user_key, - "--api-secret", - user_secret, - "--save-dir", - tmpdirname, - "--use-rescaler", - True, - "--start-time", - datetime.datetime.utcnow().isoformat(), - "--maximum-n-datasets", - 1, - ], - catch_exceptions=False, - ) - assert response.exit_code == 0, response.exception + + run(api_key=user_key, + api_secret=user_secret, + save_dir=tmpdirname, + use_rescaler=True, + start_time=datetime.datetime.utcnow().isoformat(), + maximum_n_datasets=1) + native_files = list(glob.glob(os.path.join(tmpdirname, "*.zarr.zip"))) assert len(native_files) > 0 @@ -186,26 +138,14 @@ def test_use_backup(): # noqa 103 user_key = os.environ.get("EUMETSAT_USER_KEY") user_secret = os.environ.get("EUMETSAT_USER_SECRET") with tempfile.TemporaryDirectory() as tmpdirname: - response = runner.invoke( - run, - [ - "--api-key", - user_key, - "--api-secret", - user_secret, - "--save-dir", - tmpdirname, - "--use-rescaler", - False, - "--start-time", - datetime.datetime.utcnow().isoformat(), - "--use-hr-serviri", - True, - "--maximum-n-datasets", - 1, - ], - catch_exceptions=False, - ) - assert response.exit_code == 0, response.exception + + run(api_key=user_key, + api_secret=user_secret, + save_dir=tmpdirname, + use_rescaler=False, + start_time=datetime.datetime.utcnow().isoformat(), + maximum_n_datasets=1, + use_hr_serviri=True) + native_files = list(glob.glob(os.path.join(tmpdirname, "*.zarr.zip"))) assert len(native_files) > 0 diff --git a/tests/integration_test/test_integration_eumetsat.py b/tests/integration_test/test_integration_eumetsat.py index 0b55873d..b1a86f25 100644 --- a/tests/integration_test/test_integration_eumetsat.py +++ b/tests/integration_test/test_integration_eumetsat.py @@ -6,6 +6,7 @@ import pandas as pd from satip.eumetsat import EUMETSATDownloadManager, eumetsat_filename_to_datetime +from satip.download import SEVIRI_IODC_ID def test_download_manager_setup(): @@ -107,7 +108,7 @@ def test_data_download_iodc(): datasets = download_manager.identify_available_datasets( start_date=start_date.strftime("%Y-%m-%d-%H:%M:%S"), end_date=end_date.strftime("%Y-%m-%d-%H:%M:%S"), - product_id="EO:EUM:DAT:MSG:HRSEVIRI-IODC", + product_id=SEVIRI_IODC_ID, ) assert len(datasets) > 0 From d03de6cddd8986bdb043d7c767caefb7beb01b90 Mon Sep 17 00:00:00 2001 From: peterdudfield Date: Wed, 28 Aug 2024 18:49:52 +0100 Subject: [PATCH 04/13] lint --- satip/app.py | 8 ++++++-- satip/utils.py | 47 +++++++++++++++++++++++++---------------------- 2 files changed, 31 insertions(+), 24 deletions(-) diff --git a/satip/app.py b/satip/app.py index 4388d750..c9249863 100644 --- a/satip/app.py +++ b/satip/app.py @@ -319,12 +319,16 @@ def run( save_dir=save_dir, history_time=(start_date - pd.Timedelta("30 min")) ) - if not utils.check_both_final_files_exists(save_dir=save_dir, use_hr_serviri=use_hr_serviri, use_iodc=use_iodc): + if not utils.check_both_final_files_exists(save_dir=save_dir, + use_hr_serviri=use_hr_serviri, + use_iodc=use_iodc): updated_data = True if updated_data: # Collate files into single NetCDF file - utils.collate_files_into_latest(save_dir=save_dir, use_hr_serviri=use_hr_serviri, use_iodc=use_iodc) + utils.collate_files_into_latest(save_dir=save_dir, + use_hr_serviri=use_hr_serviri, + use_iodc=use_iodc) log.debug("Collated files", memory=utils.get_memory()) log.info("Finished Running application", memory=utils.get_memory()) diff --git a/satip/utils.py b/satip/utils.py index f6e6c119..ea3d0e5b 100644 --- a/satip/utils.py +++ b/satip/utils.py @@ -152,9 +152,7 @@ def load_native_to_dataarray( Returns: Returns Xarray DataArray if script worked, else returns None """ - hrv_scaler = ScaleToZeroToOne( - variable_order=["HRV"], maxs=HRV_SCALER_MAX, mins=HRV_SCALER_MIN - ) + hrv_scaler = ScaleToZeroToOne(variable_order=["HRV"], maxs=HRV_SCALER_MAX, mins=HRV_SCALER_MIN) scaler = ScaleToZeroToOne( mins=SCALER_MINS, maxs=SCALER_MAXS, @@ -177,9 +175,7 @@ def load_native_to_dataarray( "HRV", ] ) - scene.load( - NON_HRV_BANDS - ) + scene.load(NON_HRV_BANDS) # HRV covers a smaller portion of the disk than other bands, so use that as the bounds # Selected bounds empirically for have no NaN values from off disk image, # and are covering the UK + a bit @@ -332,7 +328,7 @@ def do_v15_rescaling( dataarray = dataarray.reindex({"variable": variable_order}).transpose( "time", "y_geostationary", "x_geostationary", "variable" ) - upper_bound = (2**10) - 1 + upper_bound = (2 ** 10) - 1 new_max = maxs - mins dataarray -= mins @@ -342,12 +338,9 @@ def do_v15_rescaling( return dataarray -def get_dataset_from_scene(filename: str, - hrv_scaler, - use_rescaler: bool, - save_dir, - use_hr_serviri, - use_iodc=False): +def get_dataset_from_scene( + filename: str, hrv_scaler, use_rescaler: bool, save_dir, use_hr_serviri, use_iodc=False +): """ Returns the Xarray dataset from the filename """ @@ -450,7 +443,10 @@ def get_nonhrv_dataset_from_scene( scene = load_native_from_zip(filename) else: scene = load_hrit_from_zip(filename, sections=list(range(6, 9))) - scene.load(NON_HRV_BANDS, generate=False,) + scene.load( + NON_HRV_BANDS, + generate=False, + ) log.debug(f"Loaded non-hrv file: {filename}", memory=get_memory()) dataarray: xr.DataArray = convert_scene_to_dataarray( scene, band="IR_016", area="UK", calculate_osgb=True @@ -543,9 +539,7 @@ def save_native_to_zarr( maxs=SCALER_MAXS, variable_order=NON_HRV_BANDS, ) - hrv_scaler = ScaleToZeroToOne( - variable_order=["HRV"], maxs=HRV_SCALER_MAX, mins=HRV_SCALER_MIN - ) + hrv_scaler = ScaleToZeroToOne(variable_order=["HRV"], maxs=HRV_SCALER_MAX, mins=HRV_SCALER_MIN) for f in list_of_native_files: log.debug(f"Processing {f}", memory=get_memory()) if use_iodc: @@ -861,7 +855,11 @@ def move_older_files_to_different_location(save_dir: str, history_time: pd.Times continue if "hrv" in date: file_time = pd.to_datetime( - date.replace("iodc_", "").replace("15_", "").split(".zarr.zip")[0].split("/")[-1].split("_")[-1], + date.replace("iodc_", "") + .replace("15_", "") + .split(".zarr.zip")[0] + .split("/")[-1] + .split("_")[-1], format="%Y%m%d%H%M", errors="ignore", utc=True, @@ -913,7 +911,9 @@ def move_older_files_to_different_location(save_dir: str, history_time: pd.Times filesystem.move(date, f"{save_dir}/{date.split('/')[-1]}") -def check_both_final_files_exists(save_dir: str, use_hr_serviri: bool = False, use_iodc: bool = False): +def check_both_final_files_exists( + save_dir: str, use_hr_serviri: bool = False, use_iodc: bool = False +): """Check that both final files exists""" latest_dir = get_latest_subdir_path(save_dir) hrv_filename = f"{latest_dir}/hrv_latest{'_15' if use_hr_serviri else ''}.zarr.zip" @@ -963,7 +963,7 @@ def add_backend_to_filenames(files, backend): raise ValueError(f"Unsupported backend: {backend}") -def collate_files_into_latest(save_dir: str, use_hr_serviri: bool = False, use_iodc: bool=False): +def collate_files_into_latest(save_dir: str, use_hr_serviri: bool = False, use_iodc: bool = False): """ Convert individual files into single latest file for HRV and non-HRV @@ -1029,10 +1029,13 @@ def collate_files_into_latest(save_dir: str, use_hr_serviri: bool = False, use_i filename = f"{latest_dir}/latest{'_15' if use_hr_serviri else ''}.zarr.zip" filename_temp = f"{latest_dir}/tmp_{secrets.token_hex(6)}.zarr.zip" log.debug(f"Collating non-HRV files {filename}") - nonhrv_files = list(filesystem.glob(f"{latest_dir}/{'15_' if use_hr_serviri else ''}2*.zarr.zip")) + nonhrv_files = list( + filesystem.glob(f"{latest_dir}/{'15_' if use_hr_serviri else ''}2*.zarr.zip") + ) if "s3" in save_dir: - nonhrv_files = add_backend_to_filenames(nonhrv_files, backend="s3") # backend prefix for nonhrv + nonhrv_files = add_backend_to_filenames(nonhrv_files, backend="s3") + # backend prefix for nonhrv log.debug(nonhrv_files) o_dataset = ( From 4fa4e2ad01ad0f4e229c97ab05634e652fc27914 Mon Sep 17 00:00:00 2001 From: peterdudfield Date: Wed, 28 Aug 2024 18:50:47 +0100 Subject: [PATCH 05/13] lint --- satip/utils.py | 1 + 1 file changed, 1 insertion(+) diff --git a/satip/utils.py b/satip/utils.py index ea3d0e5b..262e65c3 100644 --- a/satip/utils.py +++ b/satip/utils.py @@ -970,6 +970,7 @@ def collate_files_into_latest(save_dir: str, use_hr_serviri: bool = False, use_i Args: save_dir: Directory where data is being saved use_hr_serviri: Whether the input data is made up of the 15 minutely backup data or not + use_iodc: Whether to use the IODC data or not """ filesystem = fsspec.open(save_dir).fs latest_dir = get_latest_subdir_path(save_dir) From 8128dee3eee62bb32ddb5fe5f74dabd7be4b1f31 Mon Sep 17 00:00:00 2001 From: peterdudfield Date: Wed, 28 Aug 2024 22:03:40 +0100 Subject: [PATCH 06/13] PR self comments --- satip/app.py | 2 +- satip/filenames.py | 26 ++++++++++ satip/utils.py | 47 ++++--------------- .../integration_test/test_integration_app.py | 8 +++- tests/unit_test/test_filenames.py | 18 +++++++ 5 files changed, 59 insertions(+), 42 deletions(-) create mode 100644 satip/filenames.py create mode 100644 tests/unit_test/test_filenames.py diff --git a/satip/app.py b/satip/app.py index c9249863..1760fdbc 100644 --- a/satip/app.py +++ b/satip/app.py @@ -127,7 +127,7 @@ def run_click( ): """ See below for function description. - There is slight duplicate, but testing adn debugging is easier with this setup. + There is slight duplicate, but testing and debugging is easier with this setup. """ run( api_key, diff --git a/satip/filenames.py b/satip/filenames.py new file mode 100644 index 00000000..6ce4fcc4 --- /dev/null +++ b/satip/filenames.py @@ -0,0 +1,26 @@ +""" Function to do with filenames """ +import pandas as pd + +def get_datetime_from_filename(filename: str) -> pd.Timestamp: + """Extract time from filename + + For example: + - folder/iodc_202408281115.zarr.zip + - folder/202006011205.zarr.zip + - folder/hrv_202408261815.zarr.zip + - folder/15_hrv_202408261815.zarr.zip + """ + + filename = filename.replace("iodc_", "") + filename = filename.replace("15_", "") + filename = filename.replace("hrv_", "") + filename = filename.split(".zarr.zip")[0] + date = filename.split("/")[-1] + + file_time = pd.to_datetime( + date, + format="%Y%m%d%H%M", + errors="ignore", + utc=True, + ) + return file_time diff --git a/satip/utils.py b/satip/utils.py index 262e65c3..40e52260 100644 --- a/satip/utils.py +++ b/satip/utils.py @@ -40,6 +40,7 @@ SCALER_MAXS, SCALER_MINS, ) +from satip.filenames import get_datetime_from_filename from satip.geospatial import GEOGRAPHIC_BOUNDS, lat_lon_to_osgb from satip.scale_to_zero_to_one import ScaleToZeroToOne, compress_mask from satip.serialize import serialize_attrs @@ -780,13 +781,7 @@ def filter_dataset_ids_on_current_files(datasets: list, save_dir: str) -> list: for date in finished_files: if "latest.zarr" in date or "latest_15.zarr" in date or "tmp" in date: continue - finished_datetimes.append( - pd.to_datetime( - date.replace("iodc_", "").replace("15_", "").split(".zarr.zip")[0].split("/")[-1], - format="%Y%m%d%H%M", - errors="ignore", - ) - ) + finished_datetimes.append(get_datetime_from_filename(filename=date)) if len(finished_datetimes) > 0: log.debug(f"The already downloaded finished datetime are {finished_datetimes}") else: @@ -853,24 +848,9 @@ def move_older_files_to_different_location(save_dir: str, history_time: pd.Times log.debug(f"Looking at file {date}") if "latest.zarr" in date or "tmp" in date: continue - if "hrv" in date: - file_time = pd.to_datetime( - date.replace("iodc_", "") - .replace("15_", "") - .split(".zarr.zip")[0] - .split("/")[-1] - .split("_")[-1], - format="%Y%m%d%H%M", - errors="ignore", - utc=True, - ) - else: - file_time = pd.to_datetime( - date.replace("iodc_", "").replace("15_", "").split(".zarr.zip")[0].split("/")[-1], - format="%Y%m%d%H%M", - errors="ignore", - utc=True, - ) + + file_time = get_datetime_from_filename(date) + if file_time > history_time: log.debug("Moving file into {LATEST_DIR_NAME} folder") # Move HRV and non-HRV to new place @@ -891,20 +871,9 @@ def move_older_files_to_different_location(save_dir: str, history_time: pd.Times log.debug(f"Looking at file {date}") if "latest.zarr" in date or "latest_15.zarr" in date or "tmp" in date: continue - if "hrv" in date: - file_time = pd.to_datetime( - date.replace("15_", "").split(".zarr.zip")[0].split("/")[-1].split("_")[-1], - format="%Y%m%d%H%M", - errors="ignore", - utc=True, - ) - else: - file_time = pd.to_datetime( - date.replace("iodc_", "").replace("15_", "").split(".zarr.zip")[0].split("/")[-1], - format="%Y%m%d%H%M", - errors="ignore", - utc=True, - ) + + file_time = get_datetime_from_filename(date) + if file_time < history_time: log.debug("Moving file out of {LATEST_DIR_NAME} folder") # Move HRV and non-HRV to new place diff --git a/tests/integration_test/test_integration_app.py b/tests/integration_test/test_integration_app.py index ded58f01..48e125ab 100644 --- a/tests/integration_test/test_integration_app.py +++ b/tests/integration_test/test_integration_app.py @@ -26,6 +26,7 @@ def test_save_to_netcdf(): # noqa 103 save_dir=tmpdirname, use_rescaler=False, start_time=datetime.datetime.utcnow().isoformat(), + history="30 minutes", maximum_n_datasets=1) native_files = list(glob.glob(os.path.join(tmpdirname, "*.zarr.zip"))) @@ -66,6 +67,7 @@ def test_save_to_netcdf_now(): # noqa 103 api_secret=user_secret, save_dir=tmpdirname, use_rescaler=False, + history="30 minutes", maximum_n_datasets=1) native_files = list(glob.glob(os.path.join(tmpdirname, "*.zarr.zip"))) @@ -79,8 +81,8 @@ def test_cleanup_now(): # noqa 103 run(api_key=user_key, api_secret=user_secret, - save_dir=tmpdirname - ,use_rescaler=False, + save_dir=tmpdirname, + use_rescaler=False, cleanup=True, maximum_n_datasets=1) @@ -128,6 +130,7 @@ def test_save_to_netcdf_rescaled(): # noqa 103 save_dir=tmpdirname, use_rescaler=True, start_time=datetime.datetime.utcnow().isoformat(), + history="30 minutes", maximum_n_datasets=1) native_files = list(glob.glob(os.path.join(tmpdirname, "*.zarr.zip"))) @@ -145,6 +148,7 @@ def test_use_backup(): # noqa 103 use_rescaler=False, start_time=datetime.datetime.utcnow().isoformat(), maximum_n_datasets=1, + history="30 minutes", use_hr_serviri=True) native_files = list(glob.glob(os.path.join(tmpdirname, "*.zarr.zip"))) diff --git a/tests/unit_test/test_filenames.py b/tests/unit_test/test_filenames.py new file mode 100644 index 00000000..acb4eb0d --- /dev/null +++ b/tests/unit_test/test_filenames.py @@ -0,0 +1,18 @@ +from satip.filenames import get_datetime_from_filename +import pandas as pd + + +def test_get_time_from_filename(): + + datetime = get_datetime_from_filename("folder/iodc_202408281115.zarr.zip") + assert datetime == pd.Timestamp("2024-08-28 11:15", tz="UTC") + + datetime = get_datetime_from_filename("folder/202006011205.zarr.zip") + assert datetime == pd.Timestamp("2020-06-01 12:05", tz="UTC") + + datetime = get_datetime_from_filename("folder/hrv_202408261815.zarr.zip") + assert datetime == pd.Timestamp("2024-08-26 18:15", tz="UTC") + + datetime = get_datetime_from_filename("folder/15_hrv_202408261815.zarr.zip") + assert datetime == pd.Timestamp("2024-08-26 18:15", tz="UTC") + From a282504484451a5d726d464a02e89e319da66fe1 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Wed, 28 Aug 2024 21:04:06 +0000 Subject: [PATCH 07/13] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- satip/filenames.py | 1 + tests/unit_test/test_filenames.py | 1 - 2 files changed, 1 insertion(+), 1 deletion(-) diff --git a/satip/filenames.py b/satip/filenames.py index 6ce4fcc4..c01cde2e 100644 --- a/satip/filenames.py +++ b/satip/filenames.py @@ -1,6 +1,7 @@ """ Function to do with filenames """ import pandas as pd + def get_datetime_from_filename(filename: str) -> pd.Timestamp: """Extract time from filename diff --git a/tests/unit_test/test_filenames.py b/tests/unit_test/test_filenames.py index acb4eb0d..7f19453c 100644 --- a/tests/unit_test/test_filenames.py +++ b/tests/unit_test/test_filenames.py @@ -15,4 +15,3 @@ def test_get_time_from_filename(): datetime = get_datetime_from_filename("folder/15_hrv_202408261815.zarr.zip") assert datetime == pd.Timestamp("2024-08-26 18:15", tz="UTC") - From d90fd2d74864bc9d0dc9465c5f3e9d4b1fe60818 Mon Sep 17 00:00:00 2001 From: peterdudfield Date: Thu, 29 Aug 2024 11:27:24 +0100 Subject: [PATCH 08/13] update iodc, as its non hrv --- satip/utils.py | 42 ++++++++++++++++++++++++++++-------------- 1 file changed, 28 insertions(+), 14 deletions(-) diff --git a/satip/utils.py b/satip/utils.py index 40e52260..2f20526c 100644 --- a/satip/utils.py +++ b/satip/utils.py @@ -435,7 +435,7 @@ def data_quality_filter(ds: xr.Dataset, threshold_fraction: float = 0.9) -> bool def get_nonhrv_dataset_from_scene( - filename: str, scaler, use_rescaler: bool, save_dir, using_backup + filename: str, scaler, use_rescaler: bool, save_dir, use_hr_serviri, use_iodc:bool=False ): """ Returns the Xarray dataset from the filename @@ -448,21 +448,29 @@ def get_nonhrv_dataset_from_scene( NON_HRV_BANDS, generate=False, ) + log.debug(f"Loaded non-hrv file: {filename}", memory=get_memory()) - dataarray: xr.DataArray = convert_scene_to_dataarray( - scene, band="IR_016", area="UK", calculate_osgb=True - ) + if not use_iodc: + dataarray: xr.DataArray = convert_scene_to_dataarray( + scene, band="IR_016", area="UK", calculate_osgb=True + ) + else: + dataarray: xr.DataArray = convert_scene_to_dataarray( + scene, band="IR_016", area="India", calculate_osgb=False + ) + log.debug(f"Converted non-HRV file {filename} to dataarray", memory=get_memory()) del scene attrs = serialize_attrs(dataarray.attrs) - if use_rescaler: - dataarray = scaler.rescale(dataarray) - else: - dataarray = do_v15_rescaling( - dataarray, - mins=SCALER_MINS, - maxs=SCALER_MAXS, - variable_order=NON_HRV_BANDS, + if not use_iodc: + if use_rescaler: + dataarray = scaler.rescale(dataarray) + else: + dataarray = do_v15_rescaling( + dataarray, + mins=SCALER_MINS, + maxs=SCALER_MAXS, + variable_order=NON_HRV_BANDS, ) dataarray = dataarray.transpose("time", "y_geostationary", "x_geostationary", "variable") dataarray = dataarray.chunk((1, 256, 256, 1)) @@ -478,7 +486,13 @@ def get_nonhrv_dataset_from_scene( gc.collect() return - save_file = os.path.join(save_dir, f"{'15_' if using_backup else ''}{now_time}.zarr.zip") + filename = f"hrv_{now_time}.zarr.zip" + if use_hr_serviri: + filename = f"15_{filename}" + if use_iodc: + filename = f"iodc_{now_time}.zarr.zip" + + save_file = os.path.join(save_dir, filename) log.debug(f"Saving non-HRV netcdf in {save_file}", memory=get_memory()) save_to_zarr_to_backend(dataset, save_file) del dataset @@ -544,7 +558,7 @@ def save_native_to_zarr( for f in list_of_native_files: log.debug(f"Processing {f}", memory=get_memory()) if use_iodc: - get_dataset_from_scene(f, hrv_scaler, False, save_dir, False, use_iodc=use_iodc) + get_nonhrv_dataset_from_scene(f, hrv_scaler, False, save_dir, False, use_iodc=use_iodc) elif "EPCT" in f: log.debug(f"Processing HRIT file {f}", memory=get_memory()) if "HRV" in f: From 43054a74f6e630718826a7cf382450e3aa34b4b1 Mon Sep 17 00:00:00 2001 From: peterdudfield Date: Thu, 29 Aug 2024 11:30:13 +0100 Subject: [PATCH 09/13] add comment --- satip/utils.py | 1 + 1 file changed, 1 insertion(+) diff --git a/satip/utils.py b/satip/utils.py index 2f20526c..0b4b49bd 100644 --- a/satip/utils.py +++ b/satip/utils.py @@ -451,6 +451,7 @@ def get_nonhrv_dataset_from_scene( log.debug(f"Loaded non-hrv file: {filename}", memory=get_memory()) if not use_iodc: + # Note band isn't really used, but its just needs to be a valid band dataarray: xr.DataArray = convert_scene_to_dataarray( scene, band="IR_016", area="UK", calculate_osgb=True ) From ae5cded1797de653f60b6f5c59ca023ebdfdcb26 Mon Sep 17 00:00:00 2001 From: peterdudfield Date: Thu, 29 Aug 2024 11:33:01 +0100 Subject: [PATCH 10/13] expand india box to start at 60 degress lat --- satip/geospatial.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/satip/geospatial.py b/satip/geospatial.py index 0015046e..5b550ca8 100644 --- a/satip/geospatial.py +++ b/satip/geospatial.py @@ -30,7 +30,7 @@ # Geographic bounds for various regions of interest, in order of min_lon, min_lat, max_lon, max_lat # (see https://satpy.readthedocs.io/en/stable/_modules/satpy/scene.html) -GEOGRAPHIC_BOUNDS = {"UK": (-16, 45, 10, 62), "RSS": (-64, 16, 83, 69), "India": (68, 6, 97, 37)} +GEOGRAPHIC_BOUNDS = {"UK": (-16, 45, 10, 62), "RSS": (-64, 16, 83, 69), "India": (60, 6, 97, 37)} class Transformers: From 50d4fa42edb8429e0d7b68c567514bd2bfb52d98 Mon Sep 17 00:00:00 2001 From: peterdudfield Date: Thu, 29 Aug 2024 12:04:57 +0100 Subject: [PATCH 11/13] scale iodc results --- satip/utils.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/satip/utils.py b/satip/utils.py index 0b4b49bd..188cf94b 100644 --- a/satip/utils.py +++ b/satip/utils.py @@ -559,7 +559,7 @@ def save_native_to_zarr( for f in list_of_native_files: log.debug(f"Processing {f}", memory=get_memory()) if use_iodc: - get_nonhrv_dataset_from_scene(f, hrv_scaler, False, save_dir, False, use_iodc=use_iodc) + get_nonhrv_dataset_from_scene(f, scaler, use_rescaler, save_dir, False, use_iodc=use_iodc) elif "EPCT" in f: log.debug(f"Processing HRIT file {f}", memory=get_memory()) if "HRV" in f: From 76c45e84f438cd8f94e13d116f50ebcf185c6aae Mon Sep 17 00:00:00 2001 From: peterdudfield Date: Thu, 29 Aug 2024 12:27:47 +0100 Subject: [PATCH 12/13] add scalling note, (no scalling for iodc) --- satip/utils.py | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/satip/utils.py b/satip/utils.py index 188cf94b..8e16b958 100644 --- a/satip/utils.py +++ b/satip/utils.py @@ -559,7 +559,13 @@ def save_native_to_zarr( for f in list_of_native_files: log.debug(f"Processing {f}", memory=get_memory()) if use_iodc: - get_nonhrv_dataset_from_scene(f, scaler, use_rescaler, save_dir, False, use_iodc=use_iodc) + get_nonhrv_dataset_from_scene(f, + scaler, + use_rescaler, + save_dir, + False, + use_iodc=use_iodc) + # note we don't do any scaling with iodc elif "EPCT" in f: log.debug(f"Processing HRIT file {f}", memory=get_memory()) if "HRV" in f: From 0f27ca90fbec1907352252dff1c36f083e9e6a78 Mon Sep 17 00:00:00 2001 From: peterdudfield Date: Thu, 29 Aug 2024 16:04:57 +0100 Subject: [PATCH 13/13] PR comments --- satip/app.py | 39 ++++++++++++++++++++------------------- 1 file changed, 20 insertions(+), 19 deletions(-) diff --git a/satip/app.py b/satip/app.py index 1760fdbc..30ba1e4c 100644 --- a/satip/app.py +++ b/satip/app.py @@ -198,8 +198,26 @@ def run( f"Fetching datasets for {start_date} - {start_time}", memory=utils.get_memory() ) - if not use_iodc: - # if not iodc, try rss, then get hr_serviri data if not rss + if use_iodc: + # get the IODC data + log.info( + f"Fetching IODC datasets for {start_date} - {start_time}", + memory=utils.get_memory(), + ) + download_manager = EUMETSATDownloadManager( + user_key=api_key, + user_secret=api_secret, + data_dir=tmpdir, + native_file_dir=save_dir_native, + ) + datasets = download_manager.identify_available_datasets( + start_date=start_date.strftime("%Y-%m-%d-%H:%M:%S"), + end_date=pd.Timestamp(start_time, tz="UTC").strftime("%Y-%m-%d-%H:%M:%S"), + product_id=SEVIRI_IODC_ID, + ) + + else: + # try rss, then get hr_serviri data if not rss download_manager = EUMETSATDownloadManager( user_key=api_key, user_secret=api_secret, @@ -228,23 +246,6 @@ def run( product_id=SEVIRI_ID, ) use_hr_serviri = True - else: - # get the IODC data - log.info( - f"Fetching IODC datasets for {start_date} - {start_time}", - memory=utils.get_memory(), - ) - download_manager = EUMETSATDownloadManager( - user_key=api_key, - user_secret=api_secret, - data_dir=tmpdir, - native_file_dir=save_dir_native, - ) - datasets = download_manager.identify_available_datasets( - start_date=start_date.strftime("%Y-%m-%d-%H:%M:%S"), - end_date=pd.Timestamp(start_time, tz="UTC").strftime("%Y-%m-%d-%H:%M:%S"), - product_id=SEVIRI_IODC_ID, - ) # Filter out ones that already exist # if both final files don't exist, then we should make sure we run the whole process