Skip to content

Commit

Permalink
Merge pull request #37 from nasa/harmony-1624
Browse files Browse the repository at this point in the history
Harmony 1624 - Allow service implementations to output multiple STAC catalogs
  • Loading branch information
vinnyinverso authored Nov 7, 2023
2 parents 34b8ca6 + 2dec905 commit 538d88f
Show file tree
Hide file tree
Showing 3 changed files with 93 additions and 11 deletions.
12 changes: 6 additions & 6 deletions harmony/adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,8 @@
class BaseHarmonyAdapter(ABC):
"""
Abstract base class for Harmony service adapters. Service implementations
should inherit from this class and implement the `#invoke(self)` method to
adapt the Harmony message (`self.message`) into a service call and the
output of the service call into a response to Harmony (`self.completed_with_*`)
should inherit from this class and implement the `#invoke(self)` or `#process_item(self, item, source)`
method to adapt the Harmony message (`self.message`) into a service call
Services may choose to override methods that do data downloads and result
staging as well, if they use a different mechanism
Expand Down Expand Up @@ -107,15 +106,16 @@ def invoke(self):
Returns
-------
(harmony.Message, pystac.Catalog)
(harmony.Message, pystac.Catalog | list)
A tuple of the Harmony message, with any processed fields marked as such and
a STAC catalog describing the output
in this implementation, a single STAC catalog describing the output.
(Services overriding this method may return a list of STAC catalogs if desired.)
"""
# New-style processing using STAC
if self.catalog:
return (self.message, self._process_catalog_recursive(self.catalog))

# Current processing using callbacks
# Deprecated, processing using callbacks
self._process_with_callbacks()

def get_all_catalog_items(self, catalog: Catalog, follow_page_links=True):
Expand Down
38 changes: 36 additions & 2 deletions harmony/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import datetime

from pystac import Catalog, CatalogType
from pystac.layout import BestPracticesLayoutStrategy

from harmony.exceptions import CanceledException, HarmonyException
from harmony.message import Message
Expand All @@ -20,6 +21,32 @@
config, create_decrypter)
from harmony.version import get_version
from harmony.aws import is_s3, write_s3
from harmony.s3_stac_io import write


class MultiCatalogLayoutStrategy(BestPracticesLayoutStrategy):
"""
Layout that adheres to what the Harmony server expects
when multiple catalogs are output by a service.
"""

def __init__(self, index):
self.index = index

def get_catalog_href(self, cat, parent_dir, is_root):
"""
Returns the catalog href, using its index number as
part of the file name, e.g. s3://outputs/catalog0.json.
Parameters
----------
parent_dir : string
The parent directory of the catalog
Returns
-------
The catalog href, postfixed with catalog{idx}.json
"""
return path.join(parent_dir, f'catalog{self.index}.json')


def setup_cli(parser):
Expand Down Expand Up @@ -212,8 +239,15 @@ def _invoke(adapter, metadata_dir):
is_s3_metadata_dir = is_s3(metadata_dir)
if not is_s3_metadata_dir:
makedirs(metadata_dir, exist_ok=True)
(out_message, out_catalog) = adapter.invoke()
out_catalog.normalize_and_save(metadata_dir, CatalogType.SELF_CONTAINED)
(out_message, stac_output) = adapter.invoke()
if isinstance(stac_output, list):
for idx, catalog in enumerate(stac_output):
catalog.normalize_and_save(metadata_dir, CatalogType.SELF_CONTAINED, MultiCatalogLayoutStrategy(idx))
json_str = json.dumps([f'catalog{i}.json' for i, c in enumerate(stac_output)])
write(path.join(metadata_dir, 'batch-catalogs.json'), json_str)
write(path.join(metadata_dir, 'batch-count.txt'), f'{len(stac_output)}')
else: # assume stac_output is a single catalog
stac_output.normalize_and_save(metadata_dir, CatalogType.SELF_CONTAINED)

if not is_s3_metadata_dir:
with open(path.join(metadata_dir, 'message.json'), 'w') as file:
Expand Down
54 changes: 51 additions & 3 deletions tests/test_cli_stac.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
import os
from tempfile import mkdtemp
from datetime import datetime
import shutil
import unittest
import json

from pystac import Catalog, CatalogType
from pystac import Catalog, CatalogType, Item

from harmony import cli, BaseHarmonyAdapter
from harmony.exceptions import ForbiddenException
Expand All @@ -18,14 +20,32 @@ class MockAdapter(BaseHarmonyAdapter):
def invoke(self):
MockAdapter.message = self.message
return (self.message, self.catalog)

class MockMultiCatalogOutputAdapter(BaseHarmonyAdapter):
message = None
"""
Dummy class to mock adapter calls, performing a no-op service
that returns multiple STAC catologs instead of one
"""
def invoke(self):
MockAdapter.message = self.message
catalogs = [
Catalog('a', ''), Catalog('b', ''), Catalog('c', '')]
for cat in catalogs:
items = [
Item(f'item-1-from-catalog-{cat.id}', None, [0, 0, 1, 1],
datetime.strptime('09/19/22 13:55:26', '%m/%d/%y %H:%M:%S'), {}),
Item(f'item-2-from-catalog-{cat.id}', None, [0, 0, 1, 2],
datetime.strptime('09/19/22 13:55:26', '%m/%d/%y %H:%M:%S'), {})
]
cat.add_items(items)
return (self.message, catalogs)


class TestCliInvokeAction(unittest.TestCase):
def setUp(self):
self.workdir = mkdtemp()
self.inputdir = mkdtemp()
self.catalog = Catalog('test-id', 'test catalog')
self.catalog.normalize_and_save(self.inputdir, CatalogType.SELF_CONTAINED)
self.config = config_fixture()
print(self.config)

Expand Down Expand Up @@ -109,6 +129,34 @@ def invoke(self):
with open(os.path.join(self.workdir, 'error.json')) as file:
self.assertEqual(file.read(), '{"error": "Service request failed with an unknown error", "category": "Unknown"}')

def test_when_multi_catalog_output_it_saves_with_particular_layout(self):
with cli_parser(
'--harmony-action', 'invoke',
'--harmony-input', '{"test": "input"}',
'--harmony-sources', 'example/source/catalog.json',
'--harmony-metadata-dir', self.workdir) as parser:
args = parser.parse_args()
cli.run_cli(parser, args, MockMultiCatalogOutputAdapter, cfg=self.config)
for idx in range(3):
cat = Catalog.from_file(os.path.join(self.workdir, f'catalog{idx}.json'))
cat_root = cat.get_single_link('root')
self.assertEqual(cat_root.get_href(), f'./catalog{idx}.json')
item_hrefs = [l.get_href() for l in cat.get_links('item')]
self.assertTrue(f'./item-1-from-catalog-{cat.id}/item-1-from-catalog-{cat.id}.json' in item_hrefs)
self.assertTrue(f'./item-2-from-catalog-{cat.id}/item-2-from-catalog-{cat.id}.json' in item_hrefs)
item = Item.from_file(os.path.join(self.workdir, f'./item-1-from-catalog-{cat.id}/item-1-from-catalog-{cat.id}.json'))
item_root_href = item.get_single_link('root').get_href()
item_parent_href = item.get_single_link('parent').get_href()
self.assertTrue(item_parent_href == item_root_href)
self.assertEqual(item_root_href, f'../catalog{idx}.json')
self.assertEqual(item_parent_href, f'../catalog{idx}.json')
with open(os.path.join(self.workdir, 'batch-count.txt')) as file:
self.assertEqual(file.read(), '3')
with open(os.path.join(self.workdir, 'batch-catalogs.json')) as file:
self.assertEqual(json.loads(file.read()),
["catalog0.json",
"catalog1.json",
"catalog2.json"])

if __name__ == '__main__':
unittest.main()

0 comments on commit 538d88f

Please sign in to comment.