Skip to content

Commit

Permalink
Merge pull request #287 from openclimatefix/issue/india-satellite
Browse files Browse the repository at this point in the history
IDOC Satellite
  • Loading branch information
peterdudfield authored Aug 29, 2024
2 parents 45cdedf + 0f27ca9 commit 50135f7
Show file tree
Hide file tree
Showing 9 changed files with 433 additions and 241 deletions.
172 changes: 132 additions & 40 deletions satip/app.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,13 @@
""" Application that pulls data from the EUMETSAT API and saves to a zarr file"""
""" Application that pulls data from the EUMETSAT API and saves to a zarr file
We now support
- The 0 deg HR-SERVIRI data - https://masif.eumetsat.int/ossi/webpages/level3.html?ossi_level3_filename=seviri_0deg_hr.html&ossi_level2_filename=seviri_0deg.html
- The 9.5 deg RSS data - https://masif.eumetsat.int/ossi/webpages/level2.html?ossi_level2_filename=seviri_rss.html
- The 45.5 deg IODC data - https://masif.eumetsat.int/ossi/webpages/level2.html?ossi_level2_filename=seviri_iodc.html
By-default we pull the RSS data, if not available we try the HR-SERVIRI.
We have an option to just use the IODC data.
"""
import glob
import os
import random
Expand All @@ -11,6 +20,7 @@

import satip
from satip import utils
from satip.download import RSS_ID, SEVIRI_ID, SEVIRI_IODC_ID
from satip.eumetsat import EUMETSATDownloadManager

log = structlog.stdlib.get_logger()
Expand Down Expand Up @@ -81,8 +91,8 @@
type=click.BOOL,
)
@click.option(
"--use-backup",
envvar="USE_BACKUP",
"--use-hr-serviri",
envvar="USE_HR_SERVIRI",
default=False,
help="Option not to use the RSS imaginary. If True, use the 15 mins data. ",
type=click.BOOL,
Expand All @@ -94,7 +104,14 @@
help="Set the maximum number of dataset to load, default gets them all",
type=click.INT,
)
def run(
@click.option(
"--use-iodc",
envvar="USE_IODC",
default=False,
help="An option to use the IODC data instead of the RSS data.",
type=click.BOOL,
)
def run_click(
api_key,
api_secret,
save_dir,
Expand All @@ -104,8 +121,42 @@ def run(
use_rescaler: bool = False,
start_time: str = pd.Timestamp.utcnow().isoformat(timespec="minutes").split("+")[0],
cleanup: bool = False,
use_backup: bool = False,
use_hr_serviri: bool = False,
maximum_n_datasets: int = -1,
use_iodc: bool = False,
):
""" See below for function description.
There is slight duplicate, but testing and debugging is easier with this setup.
"""
run(
api_key,
api_secret,
save_dir,
save_dir_native,
history,
use_rescaler=use_rescaler,
start_time=start_time,
cleanup=cleanup,
use_hr_serviri=use_hr_serviri,
maximum_n_datasets=maximum_n_datasets,
use_iodc=use_iodc
)



def run(
api_key,
api_secret,
save_dir = './',
save_dir_native = "./raw",
history="60 minutes",
use_rescaler: bool = False,
start_time: str = pd.Timestamp.utcnow().isoformat(timespec="minutes").split("+")[0],
cleanup: bool = False,
use_hr_serviri: bool = False,
maximum_n_datasets: int = -1,
use_iodc: bool = False,
):
"""Run main application
Expand All @@ -115,12 +166,12 @@ def run(
save_dir: Save directory
save_dir_native: where the native files are saved
history: History time
db_url: URL of database
use_rescaler: Rescale data to between 0 and 1 or not
start_time: Start time in UTC ISO Format
cleanup: Cleanup Data Tailor
use_backup: use 15 min data, not RSS
use_hr_serviri: use 15 min data, not RSS
maximum_n_datasets: Set the maximum number of dataset to load, default gets them all
use_iodc: Use IODC data instead
"""

utils.setupLogging()
Expand All @@ -141,37 +192,61 @@ def run(
)
# 1. Get data from API, download native files
with tempfile.TemporaryDirectory() as tmpdir:
download_manager = EUMETSATDownloadManager(
user_key=api_key,
user_secret=api_secret,
data_dir=tmpdir,
native_file_dir=save_dir_native,
)
if cleanup:
log.debug("Running Data Tailor Cleanup", memory=utils.get_memory())
download_manager.cleanup_datatailor()
return

start_date = pd.Timestamp(start_time, tz="UTC") - pd.Timedelta(history)
log.info(
f"Fetching datasets for {start_date} - {start_time}", memory=utils.get_memory()
)
datasets = download_manager.identify_available_datasets(
start_date=start_date.strftime("%Y-%m-%d-%H:%M:%S"),
end_date=pd.Timestamp(start_time, tz="UTC").strftime("%Y-%m-%d-%H:%M:%S"),
)
# Check if any RSS imagery is available, if not, fall back to 15 minutely data
if (len(datasets) == 0) or use_backup:
log.warn(
f"No RSS Imagery available or using backup ({use_backup=}), "
f"falling back to 15-minutely data",

if use_iodc:
# get the IODC data
log.info(
f"Fetching IODC datasets for {start_date} - {start_time}",
memory=utils.get_memory(),
)
download_manager = EUMETSATDownloadManager(
user_key=api_key,
user_secret=api_secret,
data_dir=tmpdir,
native_file_dir=save_dir_native,
)
datasets = download_manager.identify_available_datasets(
start_date=start_date.strftime("%Y-%m-%d-%H:%M:%S"),
end_date=pd.Timestamp(start_time, tz="UTC").strftime("%Y-%m-%d-%H:%M:%S"),
product_id=SEVIRI_IODC_ID,
)

else:
# try rss, then get hr_serviri data if not rss
download_manager = EUMETSATDownloadManager(
user_key=api_key,
user_secret=api_secret,
data_dir=tmpdir,
native_file_dir=save_dir_native,
)
if cleanup:
log.debug("Running Data Tailor Cleanup", memory=utils.get_memory())
download_manager.cleanup_datatailor()
return

datasets = download_manager.identify_available_datasets(
start_date=start_date.strftime("%Y-%m-%d-%H:%M:%S"),
end_date=pd.Timestamp(start_time, tz="UTC").strftime("%Y-%m-%d-%H:%M:%S"),
product_id="EO:EUM:DAT:MSG:HRSEVIRI",
)
use_backup = True
# Check if any RSS imagery is available, if not, fall back to 15 minutely data
if (len(datasets) == 0) or use_hr_serviri:
log.warn(
f"No RSS Imagery available or using backup ({use_hr_serviri=}), "
f"falling back to 15-minutely data",
memory=utils.get_memory(),
)
datasets = download_manager.identify_available_datasets(
start_date=start_date.strftime("%Y-%m-%d-%H:%M:%S"),
end_date=pd.Timestamp(start_time, tz="UTC").strftime("%Y-%m-%d-%H:%M:%S"),
product_id=SEVIRI_ID,
)
use_hr_serviri = True

# Filter out ones that already exist
# if both final files don't exist, then we should make sure we run the whole process
datasets = utils.filter_dataset_ids_on_current_files(datasets, save_dir)
Expand All @@ -191,31 +266,43 @@ def run(
datasets = datasets[0:maximum_n_datasets]
random.shuffle(datasets) # Shuffle so subsequent runs might download different data
updated_data = True
if use_backup:
if use_hr_serviri:
# Check before downloading each tailored dataset, as it can take awhile
for dset in datasets:
dset = utils.filter_dataset_ids_on_current_files([dset], save_dir)
if len(dset) > 0:
download_manager.download_tailored_datasets(
dset,
product_id="EO:EUM:DAT:MSG:HRSEVIRI",
product_id=SEVIRI_ID,
)
elif use_iodc:
# Check before downloading each dataset, as it can take a while
for dset in datasets:
dset = utils.filter_dataset_ids_on_current_files([dset], save_dir)
if len(dset) > 0:
# note we might have to change this to the data taylor
download_manager.download_datasets(
dset,
product_id=SEVIRI_IODC_ID,
)

else:
# Check before downloading each tailored dataset, as it can take awhile
for dset in datasets:
dset = utils.filter_dataset_ids_on_current_files([dset], save_dir)
if len(dset) > 0:
download_manager.download_datasets(
dset,
product_id="EO:EUM:DAT:MSG:MSG15-RSS",
product_id=RSS_ID,
)

# 2. Load nat files to one Xarray Dataset
native_files = (
list(glob.glob(os.path.join(tmpdir, "*.nat")))
if not use_backup
else list(glob.glob(os.path.join(tmpdir, "*HRSEVIRI*")))
)
if use_hr_serviri:
native_files = list(glob.glob(os.path.join(tmpdir, "*HRSEVIRI*")))
else:
# RSS or IODC
native_files = list(glob.glob(os.path.join(tmpdir, "*.nat")))

log.debug(
"Saving native files to Zarr: " + native_files.__str__(),
memory=utils.get_memory(),
Expand All @@ -225,19 +312,24 @@ def run(
native_files,
save_dir=save_dir,
use_rescaler=use_rescaler,
using_backup=use_backup,
use_hr_serviri=use_hr_serviri,
use_iodc=use_iodc,
)
# Move around files into and out of latest
utils.move_older_files_to_different_location(
save_dir=save_dir, history_time=(start_date - pd.Timedelta("30 min"))
)

if not utils.check_both_final_files_exists(save_dir=save_dir, using_backup=use_backup):
if not utils.check_both_final_files_exists(save_dir=save_dir,
use_hr_serviri=use_hr_serviri,
use_iodc=use_iodc):
updated_data = True

if updated_data:
# Collate files into single NetCDF file
utils.collate_files_into_latest(save_dir=save_dir, using_backup=use_backup)
utils.collate_files_into_latest(save_dir=save_dir,
use_hr_serviri=use_hr_serviri,
use_iodc=use_iodc)
log.debug("Collated files", memory=utils.get_memory())

log.info("Finished Running application", memory=utils.get_memory())
Expand All @@ -248,4 +340,4 @@ def run(


if __name__ == "__main__":
run()
run_click()
6 changes: 4 additions & 2 deletions satip/download.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@
RSS_ID = "EO:EUM:DAT:MSG:MSG15-RSS"
CLOUD_ID = "EO:EUM:DAT:MSG:RSS-CLM"
SEVIRI_ID = "EO:EUM:DAT:MSG:HRSEVIRI"

SEVIRI_IODC_ID = "EO:EUM:DAT:MSG:HRSEVIRI-IODC"

def download_eumetsat_data(
download_directory,
Expand Down Expand Up @@ -117,8 +117,10 @@ def download_eumetsat_data(
products_to_use.append(RSS_ID)
if "cloud" in product:
products_to_use.append(CLOUD_ID)
if "seviri" in product:
if ("seviri" in product) and 'iodc' not in product:
products_to_use.append(SEVIRI_ID)
if "seviri_iodc" in product:
products_to_use.append(SEVIRI_IODC_ID)

for product_id in products_to_use:
# Do this to clear out any partially downloaded days
Expand Down
27 changes: 27 additions & 0 deletions satip/filenames.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
""" Function to do with filenames """
import pandas as pd


def get_datetime_from_filename(filename: str) -> pd.Timestamp:
"""Extract time from filename
For example:
- folder/iodc_202408281115.zarr.zip
- folder/202006011205.zarr.zip
- folder/hrv_202408261815.zarr.zip
- folder/15_hrv_202408261815.zarr.zip
"""

filename = filename.replace("iodc_", "")
filename = filename.replace("15_", "")
filename = filename.replace("hrv_", "")
filename = filename.split(".zarr.zip")[0]
date = filename.split("/")[-1]

file_time = pd.to_datetime(
date,
format="%Y%m%d%H%M",
errors="ignore",
utc=True,
)
return file_time
2 changes: 1 addition & 1 deletion satip/geospatial.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@

# Geographic bounds for various regions of interest, in order of min_lon, min_lat, max_lon, max_lat
# (see https://satpy.readthedocs.io/en/stable/_modules/satpy/scene.html)
GEOGRAPHIC_BOUNDS = {"UK": (-16, 45, 10, 62), "RSS": (-64, 16, 83, 69)}
GEOGRAPHIC_BOUNDS = {"UK": (-16, 45, 10, 62), "RSS": (-64, 16, 83, 69), "India": (60, 6, 97, 37)}


class Transformers:
Expand Down
Loading

0 comments on commit 50135f7

Please sign in to comment.