Skip to content

Commit

Permalink
Merge pull request #29 from podaac/release/0.4.0
Browse files Browse the repository at this point in the history
Release/0.4.0
  • Loading branch information
frankinspace authored Sep 11, 2024
2 parents 8bdd36f + d0aac66 commit 2f2dd9c
Show file tree
Hide file tree
Showing 17 changed files with 546 additions and 306 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/build-pipeline.yml
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ jobs:
-Dsonar.tests=tests/
-Dsonar.projectName=${{ github.repository }}
-Dsonar.projectVersion=${{ env.software_version }}
-Dsonar.python.version=3.9,3.10
-Dsonar.python.version=3.10,3.11
- name: Run Snyk as a blocking step
uses: snyk/actions/python-3.10@master
env:
Expand Down
1 change: 1 addition & 0 deletions .github/workflows/release-created.yml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ jobs:
with:
repository: ${{ github.repository }}
token: ${{ steps.podaac-cicd.outputs.token }}
ref: 'refs/heads/develop'
- uses: actions/setup-python@v5
with:
python-version: '3.10'
Expand Down
9 changes: 8 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,12 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [Unreleased]
### Changed

## [0.4.0]
### Changed
- [issues/25](https://github.com/podaac/net2cog/issues/25): Converted harmony adapter to operate on STAC catalog
- [issues/3](https://github.com/podaac/net2cog/issues/3): Improved error handling and updated test cases to use new-style harmony execution

## [0.3.0]
### Changed
Expand All @@ -30,5 +36,6 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Setup process for deploying the netcdf reformatter to SIT using Terraform deployment via Jenkins. In order to accomplish this I setup unique terraform naming conventions for the netcdf converter while maintaining the same terraform config as l2ss. Updated the jenkins logic to allow for SIT deployment testing.


[Unreleased]: https://github.com/podaac/net2cog/compare/v0.3.0...HEAD
[Unreleased]: https://github.com/podaac/net2cog/compare/v0.4.0...HEAD
[0.4.0]: https://github.com/podaac/net2cog/compare/v0.3.0...v0.4.0
[0.3.0]: https://github.com/podaac/net2cog/compare/eabb00704a6fc693aa4d79536dc5c5354c6de4d9...v0.3.0
14 changes: 12 additions & 2 deletions cmr/netcdf_cmr_umm_s.json
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,18 @@
],
"SupportedInputFormat": "NETCDF-4"
}
]
],
"Subset": {
"VariableSubset": {
"AllowMultipleValues": false
}
}
},
"OperationMetadata": [
{
"OperationName": "VARIABLE_SUBSETTING"
}
],
"AccessConstraints": "None",
"ServiceKeywords": [
{
Expand Down Expand Up @@ -69,4 +79,4 @@
"Name": "UMM-S",
"Version": "1.5.2"
}
}
}
2 changes: 1 addition & 1 deletion cmr/ops_associations.txt
Original file line number Diff line number Diff line change
@@ -1 +1 @@
C1234410736-POCLOUD
C1940468263-POCLOUD
3 changes: 2 additions & 1 deletion cmr/uat_associations.txt
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
C1247485690-LARC_CLOUD
C1247485690-LARC_CLOUD
C1234410736-POCLOUD
8 changes: 5 additions & 3 deletions docker/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -15,18 +15,20 @@ FROM python:3.10-slim
RUN apt-get update \
&& DEBIAN_FRONTEND=noninteractive apt-get upgrade -y \
gcc \
g++ \
libnetcdf-dev \
libhdf5-dev \
hdf5-helpers \
libgdal-dev \
&& pip3 install --upgrade pip \
&& pip3 install cython \
&& apt-get clean

# Create a new user
RUN adduser --quiet --disabled-password --shell /bin/sh --home /home/dockeruser --gecos "" --uid 1000 dockeruser
USER dockeruser
ENV HOME /home/dockeruser
ENV PYTHONPATH "${PYTHONPATH}:/home/dockeruser/.local/bin"
ENV HOME=/home/dockeruser
ENV PYTHONPATH="${PYTHONPATH}:/home/dockeruser/.local/bin"
ENV PATH="/home/dockeruser/.local/bin:${PATH}"

# The 'SOURCE' argument is what will be used in 'pip install'.
Expand All @@ -43,7 +45,7 @@ WORKDIR /worker

COPY --chown=dockeruser $DIST_PATH $DIST_PATH
USER dockeruser
RUN pip3 install --no-cache-dir --force --user --index-url https://pypi.org/simple/ --extra-index-url https://test.pypi.org/simple/ $SOURCE \
RUN pip install --no-cache-dir --force --user --index-url https://pypi.org/simple/ --extra-index-url https://test.pypi.org/simple/ $SOURCE \
&& rm -rf $DIST_PATH

COPY --chown=dockeruser ./docker/docker-entrypoint.sh docker-entrypoint.sh
Expand Down
27 changes: 18 additions & 9 deletions net2cog/netcdf_convert.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,30 +7,36 @@
Functions related to converting a NetCDF file to other formats.
"""

import logging
import os
import pathlib
from os.path import join as pjoin, basename, dirname, exists, splitext
import subprocess
from subprocess import check_call

import logging
import tempfile
from os.path import join as pjoin, basename, dirname, exists, splitext
from subprocess import check_call
from typing import List

import xarray as xr
import rasterio
import rioxarray # noqa
import xarray as xr
from rasterio import CRS

from rio_cogeo.cogeo import cog_translate
from rio_cogeo.profiles import cog_profiles

import rioxarray # noqa
from rioxarray.exceptions import DimensionError

LOGGER = logging.getLogger(__name__)
EXCLUDE_VARS = ['lon', 'lat', 'longitude', 'latitude', 'time']


class Net2CogError(Exception):
"""
Exception raised when an error occurs while converting a NetCDF file to COG
"""

def __init__(self, msg):
super().__init__(msg)


def run_command(command, work_dir):
"""
A simple utility to execute a subprocess command.
Expand Down Expand Up @@ -188,7 +194,10 @@ def netcdf_converter(input_nc_file: pathlib.Path, output_cog_pathname: pathlib.P
# xds_reversed = xds.reindex(lat=xds.lat[::-1])
LOGGER.info("Writing COG to %s", basename(gtiff_fname))
if var_list:
xds = xds[var_list]
try:
xds = xds[var_list]
except KeyError as error:
raise Net2CogError(f"Variable {error} not found in dataset") from error
return _write_cogtiff(gtiff_fname, xds)
LOGGER.error("%s: NetCDF file does not contain spatial dimensions such as lat / lon "
"or x / y", netcdf_file)
Expand Down
152 changes: 94 additions & 58 deletions net2cog/netcdf_convert_harmony.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,19 @@
Implementation of harmony-service-lib that invokes the netcdf converter.
"""
import argparse
import json
import os
import pathlib
import shutil
import tempfile

import harmony
import pystac
from harmony.exceptions import HarmonyException
from pystac import Asset

from net2cog import netcdf_convert
from net2cog.netcdf_convert import Net2CogError

DATA_DIRECTORY_ENV = "DATA_DIRECTORY"

Expand All @@ -25,70 +32,99 @@ class NetcdfConverterService(harmony.BaseHarmonyAdapter):
for documentation and examples.
"""

def __init__(self, message):
super().__init__(message)
def __init__(self, message, catalog=None, config=None):
super().__init__(message, catalog, config)

self.data_dir = os.getenv(DATA_DIRECTORY_ENV, '/home/dockeruser/data')
self.job_data_dir = os.path.join(self.data_dir, message.requestId)
pathlib.Path(self.data_dir).mkdir(parents=True, exist_ok=True)

# Create temp directory
pathlib.Path(self.job_data_dir).mkdir(parents=True, exist_ok=True)
self.job_data_dir = tempfile.mkdtemp(prefix=message.requestId, dir=self.data_dir)

def invoke(self):
"""Run the service on the message contained in `self.message`.
Fetches data, runs the service, puts the result in a file,
calls back to Harmony, and cleans up after itself.
def process_item(self, item: pystac.Item, source: harmony.message.Source) -> pystac.Item:
"""

logger = self.logger
message = self.message

logger.info("Received message %s", message)

Performs net2cog on input STAC Item's data, returning
an output STAC item
Parameters
----------
item : pystac.Item
the item that should be coggified
source : harmony.message.Source
the input source defining the item
Returns
-------
pystac.Item
a STAC item describing the output
"""
result = item.clone()
result.assets = {}
output_dir = self.job_data_dir
try:
# Limit to the first granule. See note in method documentation
granules = message.granules
if message.isSynchronous:
granules = granules[:1]

for i, granule in enumerate(granules):
self.download_granules([granule])

self.logger.info('local_filename = %s', granule.local_filename)
directory_name = os.path.splitext(os.path.basename(granule.local_filename))[0]
output_file_directory = os.path.join(self.job_data_dir,
f'converted_{directory_name}')
output_filename = pathlib.Path(f'{output_file_directory}').joinpath(os.path.basename(granule.name))
self.logger.debug('output: %s', output_filename)

# Run the netcdf converter for the complete netcdf granule
cogs_generated = netcdf_convert.netcdf_converter(
granule.local_filename, output_filename
)
current_progress = int(100 * i / len(granules))
next_progress = int(100 * (i + 1) / len(granules))
for cog in cogs_generated:
if message.isSynchronous:
self.completed_with_local_file(
cog,
remote_filename=os.path.basename(cog),
mime="tiff"
)
else:
self.async_add_local_file_partial_result(
cog,
remote_filename=os.path.basename(cog),
title=granule.id,
progress=current_progress if cog != cogs_generated[-1] else next_progress,
mime="tiff"
)
if not message.isSynchronous:
self.async_completed_successfully()

except Exception as ex: # pylint: disable=W0703
logger.exception(ex)
self.completed_with_error('An unexpected error occurred')
self.logger.info('Input item: %s', json.dumps(item.to_dict()))
self.logger.info('Input source: %s', source)
# Get the data file
asset = next(v for k, v in item.assets.items() if 'data' in (v.roles or []))
self.logger.info('Downloading %s to %s', asset.href, output_dir)
input_filename = harmony.adapter.util.download(asset.href,
output_dir,
logger=self.logger,
access_token=self.message.accessToken,
cfg=self.config)

# Generate output filename
output_filename, output_file_ext = os.path.splitext(
harmony.adapter.util.generate_output_filename(asset.href, ext='tif'))
output_filename = f'{output_filename}_converted{output_file_ext}'

# Determine variables that need processing
self.logger.info('Generating COG(s) for %s output will be saved to %s', input_filename, output_filename)
var_list = source.process('variables')
if not isinstance(var_list, list):
var_list = [var_list]
if len(var_list) != 1:
raise HarmonyException(
'net2cog harmony adapter currently only supports processing one variable at a time. '
'Please specify a single variable in your Harmony request.')
var_list = list(map(lambda var: var.name, var_list))
self.logger.info('Processing variables %s', var_list)

# Run the netcdf converter for the complete netcdf granule
try:
cog_generated = next(iter(netcdf_convert.netcdf_converter(pathlib.Path(input_filename),
pathlib.Path(output_dir).joinpath(
output_filename),
var_list=var_list)), [])
except Net2CogError as error:
raise HarmonyException(
f'net2cog failed to convert {asset.title}: {error}') from error
except Exception as uncaught_exception:
raise HarmonyException(str(f'Uncaught error in net2cog. '
f'Notify net2cog service provider. '
f'Message: {uncaught_exception}')) from uncaught_exception

# Stage the output file with a conventional filename
self.logger.info('Generated COG %s', cog_generated)
staged_filename = os.path.basename(cog_generated)
url = harmony.adapter.util.stage(cog_generated,
staged_filename,
pystac.MediaType.COG,
location=self.message.stagingLocation,
logger=self.logger,
cfg=self.config)
self.logger.info('Staged %s to %s', cog_generated, url)

# Update the STAC record
result.assets['visual'] = Asset(url, title=staged_filename, media_type=pystac.MediaType.COG,
roles=['visual'])

# Return the STAC record
self.logger.info('Processed item %s', json.dumps(result.to_dict()))
return result
finally:
self.cleanup()
# Clean up any intermediate resources
shutil.rmtree(self.job_data_dir)


def main():
Expand All @@ -100,7 +136,7 @@ def main():
None
"""
parser = argparse.ArgumentParser(prog='podaac-netcdf-converter',
parser = argparse.ArgumentParser(prog='net2cog_harmony',
description='Run the netcdf converter service')
harmony.setup_cli(parser)
args = parser.parse_args()
Expand Down
Loading

0 comments on commit 2f2dd9c

Please sign in to comment.