Skip to content

Commit

Permalink
Debug LS and S2 STAC iteration (#50)
Browse files Browse the repository at this point in the history
* Debug LS and S2 STAC iteration

* Update deps

* Dry run Sentinel and Landsat with smaller queries

* Trigger CI

* Fix typo

* Improved workflow

* Start parellelizing

* Fix type annotations

* Move func

* Use __main__ guard

* Use executor.map

* Pass dry_run

* More debug printing

* Iterate directly over Landsat S3 bucket

* Lint

* Set AWS_REQUEST_PAYER

* Update ingest Action
  • Loading branch information
banesullivan authored Jan 24, 2022
1 parent 4b2f7b9 commit 951d029
Show file tree
Hide file tree
Showing 8 changed files with 148 additions and 84 deletions.
17 changes: 9 additions & 8 deletions .github/workflows/data_ingest.yml
Original file line number Diff line number Diff line change
@@ -1,18 +1,17 @@
name: STAC Data Ingest
on:
push:
branches:
- foo
# schedule:
# # Run every wednesday morning
# - cron: "0 0 * * WED"
workflow_dispatch:
schedule:
# Run every wednesday morning
- cron: "0 0 * * WED"
jobs:
ingest:
runs-on: ubuntu-latest
strategy:
fail-fast: false
matrix:
script: ['drop1', 'worldview'] # 'landsat', 'sentinel' TODO: add back when these servers are working
# Landsat and Sentinel must be run on self hosted runner (i.e., locally on Bane's machine)
script: ['drop1', 'worldview', ] #'landsat', 'sentinel']
steps:
- uses: actions/checkout@v2
- name: Set up Python
Expand All @@ -23,12 +22,14 @@ jobs:
run: |
pip install --upgrade pip
cd rgd-watch-client
pip install -e . boto3 mock
pip install -e .
cd ..
pip install -r ./scripts/requirements.txt
- name: Run STAC Ingest
run: |
python scripts/${{ matrix.script }}.py
env:
AWS_REQUEST_PAYER: requester
AWS_ACCESS_KEY_ID: ${{ secrets.WATCH_AWS_ACCESS_KEY_ID }}
AWS_SECRET_ACCESS_KEY: ${{ secrets.WATCH_AWS_SECRET_ACCESS_KEY }}
SMART_STAC_API_KEY: ${{ secrets.SMART_STAC_API_KEY }}
Expand Down
3 changes: 3 additions & 0 deletions rgd-watch-client/rgd_watch_client/plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,9 @@ def post_stac_file(
print(f'Record already exists with ID: {f["id"]}')
return f

if debug:
print('Record being created...')

return self.session.post('watch/stac_file', json={'file': checksum_file['id']}).json()

def reprocess_stac_file(self, id: Union[int, str]):
Expand Down
23 changes: 13 additions & 10 deletions scripts/drop1.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
from watch_helpers import post_stac_items_from_s3_iter

bucket = 'kitware-smart-watch-data'
prefix = 'processed/ta1/drop1/coreg_and_brdf/'
collection = 'drop1/coreg_and_brdf'
region = 'us-west-2'
post_stac_items_from_s3_iter(bucket, prefix, collection, region=region)
if __name__ == '__main__':
bucket = 'kitware-smart-watch-data'
prefix = 'processed/ta1/drop1/coreg_and_brdf/'
collection = 'drop1/coreg_and_brdf'
region = 'us-west-2'
print(collection)
post_stac_items_from_s3_iter(bucket, prefix, collection, region=region)

bucket = 'kitware-smart-watch-data'
prefix = 'processed/ta1/drop1/mtra/'
collection = 'drop1/mtra'
region = 'us-west-2'
post_stac_items_from_s3_iter(bucket, prefix, collection, region=region)
bucket = 'kitware-smart-watch-data'
prefix = 'processed/ta1/drop1/mtra/'
collection = 'drop1/mtra'
region = 'us-west-2'
print(collection)
post_stac_items_from_s3_iter(bucket, prefix, collection, region=region)
48 changes: 35 additions & 13 deletions scripts/landsat.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,39 @@
from watch_helpers import post_stac_items_from_server
from watch_helpers import post_stac_items_from_s3_iter

host_url = 'https://landsatlook.usgs.gov/stac-server/collections/landsat-c2l1/items'
collection = 'landsat-c2l1'
post_stac_items_from_server(host_url, collection)
if __name__ == '__main__':
# min_date = datetime(2013, 1, 1) # Arbitrarily chosen
# max_date = datetime.today()
# host_url = 'https://landsatlook.usgs.gov/stac-server/'
#
# collection = 'landsat-c2l1'
# print(collection)
# post_stac_items_from_server(host_url, collection, min_date=min_date, max_date=max_date)
#
# collection = 'landsat-c2l2-sr'
# print(collection)
# post_stac_items_from_server(host_url, collection, min_date=min_date, max_date=max_date)

host_url = 'https://landsatlook.usgs.gov/stac-server/collections/landsat-c2l2-sr/items'
collection = 'landsat-c2l2-sr'
post_stac_items_from_server(host_url, collection)
# host_url = 'https://api.smart-stac.com/'
# collection = 'smart-landsat-c2l1'
# post_stac_items_from_server(host_url, collection, min_date=min_date, max_date=max_date)
#
# collection = 'smart-landsat-c2l2-sr'
# post_stac_items_from_server(host_url, collection, min_date=min_date, max_date=max_date)

host_url = 'https://api.smart-stac.com/collections/landsat-c2l1/items'
collection = 'smart-landsat-c2l1'
post_stac_items_from_server(host_url, collection)
bucket = 'usgs-landsat'
region = 'us-west-2'
include_regex: str = r'^.*stac\.json'

host_url = 'https://api.smart-stac.com/collections/landsat-c2l2-sr/items'
collection = 'smart-landsat-c2l2-sr'
post_stac_items_from_server(host_url, collection)
prefix = 'collection02/level-1/'
collection = 'landsat-c2l1'
print(collection)
post_stac_items_from_s3_iter(
bucket, prefix, collection, include_regex=include_regex, region=region
)

prefix = 'collection02/level-2/'
collection = 'landsat-c2l2-sr'
print(collection)
post_stac_items_from_s3_iter(
bucket, prefix, collection, include_regex=include_regex, region=region
)
3 changes: 3 additions & 0 deletions scripts/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
boto3
mock
pystac-client
19 changes: 13 additions & 6 deletions scripts/sentinel.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,16 @@
from datetime import datetime

from watch_helpers import post_stac_items_from_server

host_url = 'https://earth-search.aws.element84.com/v0/collections/sentinel-s2-l1c/items'
collection = 'sentinel-s2-l1c'
post_stac_items_from_server(host_url, collection)
if __name__ == '__main__':
min_date = datetime(2015, 12, 31) # First available found
max_date = datetime.today()
host_url = 'https://earth-search.aws.element84.com/v0/'

collection = 'sentinel-s2-l1c'
print(collection)
post_stac_items_from_server(host_url, collection, min_date=min_date, max_date=max_date)

host_url = 'https://earth-search.aws.element84.com/v0/collections/sentinel-s2-l2a/items'
collection = 'sentinel-s2-l2a'
post_stac_items_from_server(host_url, collection)
collection = 'sentinel-s2-l2a'
print(collection)
post_stac_items_from_server(host_url, collection, min_date=min_date, max_date=max_date)
107 changes: 65 additions & 42 deletions scripts/watch_helpers/__init__.py
Original file line number Diff line number Diff line change
@@ -1,65 +1,90 @@
from contextlib import suppress
import concurrent.futures
from datetime import datetime, timedelta

# import multiprocessing
import os
import re
from typing import Generator
from typing import Generator, List

import boto3
import mock
import requests
from pystac_client import Client
from rgd_watch_client import create_watch_client


def iter_matching_objects(
def iter_matching_object_urls(
s3_client,
bucket: str,
prefix: str,
include_regex: str,
) -> Generator[dict, None, None]:
) -> Generator[str, None, None]:
paginator = s3_client.get_paginator('list_objects_v2')
page_iter = paginator.paginate(Bucket=bucket, Prefix=prefix, RequestPayer='requester')
include_pattern = re.compile(include_regex)

for page in page_iter:
for obj in page['Contents']:
if include_pattern.match(obj['Key']):
yield obj
yield f's3://{bucket}/{obj["Key"]}' # TODO: modified date


def iter_stac_items(url: str, api_key: str = None) -> Generator[dict, None, None]:
stack = [url]
def get_stac_item_self_link(links):
for link in links:
if link['rel'] == 'self':
return link['href']
raise ValueError('No self link found')


def iter_stac_item_urls(
url: str, collections: List[str], min_date: datetime, max_date: datetime, api_key: str = None
) -> Generator[str, None, None]:
if max_date <= min_date:
raise ValueError('End date must be after start date.')

headers = {}
if api_key:
# Specific to SMART catalogs
headers['x-api-key'] = api_key

while stack:
url = stack.pop()
collection = requests.get(url, headers=headers).json()
# see if there's pages
with suppress(KeyError):
for link in collection['links']:
if link['rel'] == 'next':
stack.append(link['href'])
break
# iterate through items
for item in collection['features']:
yield item


def get_stac_item_self_link(links):
for link in links:
if link['rel'] == 'self':
return link['href']
raise ValueError('No self link found')
date = min_date
catalog = Client.open(url, headers=headers)
delta = timedelta(days=1)
while date <= max_date:
print(date) # DEBUG
begin_time = datetime.now()
results = catalog.search(collections=collections, datetime=[date, date + delta])
for item in results.get_items():
yield get_stac_item_self_link(item.to_dict()['links'])
print(f'\t{datetime.now() - begin_time}')
date += delta


def get_client(dry_run: bool = False):
if dry_run:
if True: # dry_run: TODO
return mock.Mock()
return create_watch_client()


class Handler:
def __init__(self, collection: str, dry_run: bool = False):
self.collection = collection
self.dry_run = dry_run

def __call__(self, url: str):
client = get_client(self.dry_run)
client.watch.post_stac_file(url, self.collection)


# def handle_posts(iter_func, collection, dry_run, **kwargs):
# pool = multiprocessing.Pool(multiprocessing.cpu_count())
# pool.map(Handler(collection, dry_run), iter_func(**kwargs))


def handle_posts(iter_func, collection, dry_run, **kwargs):
with concurrent.futures.ThreadPoolExecutor(max_workers=32) as executor:
executor.map(Handler(collection, dry_run), iter_func(**kwargs))


def post_stac_items_from_s3_iter(
bucket: str,
prefix: str,
Expand All @@ -74,28 +99,26 @@ def post_stac_items_from_s3_iter(
session = boto3.Session(**boto3_params)
s3_client = session.client('s3')

client = get_client(dry_run)
i = 0
for obj in iter_matching_objects(s3_client, bucket, prefix, include_regex):
url = f's3://{bucket}/{obj["Key"]}'
client.watch.post_stac_file(url=url, collection=collection, debug=True)
i += 1
print(f'Handled {i} STACFile records.')
kwargs = dict(s3_client=s3_client, bucket=bucket, prefix=prefix, include_regex=include_regex)
return handle_posts(iter_matching_object_urls, collection, dry_run, **kwargs)


def post_stac_items_from_server(
host_url: str,
collection: str,
min_date: datetime,
max_date: datetime,
api_key: str = None,
dry_run: bool = False,
):
if api_key is None:
api_key = os.environ.get('SMART_STAC_API_KEY', None)

client = get_client(dry_run)
i = 0
for item in iter_stac_items(host_url, api_key=api_key):
url = get_stac_item_self_link(item['links'])
client.watch.post_stac_file(url=url, collection=collection, debug=True)
i += 1
print(f'Handled {i} STACFile records.')
kwargs = dict(
url=host_url,
collections=[collection],
min_date=min_date,
max_date=max_date,
api_key=api_key,
)
return handle_posts(iter_stac_item_urls, collection, dry_run, **kwargs)
12 changes: 7 additions & 5 deletions scripts/worldview.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
from watch_helpers import post_stac_items_from_s3_iter

bucket = 'smart-imagery'
prefix = 'worldview-nitf/'
collection = 'WorldView'
region = 'us-west-2'
if __name__ == '__main__':
bucket = 'smart-imagery'
prefix = 'worldview-nitf/'
collection = 'WorldView'
region = 'us-west-2'

post_stac_items_from_s3_iter(bucket, prefix, collection, region=region)
print(collection)
post_stac_items_from_s3_iter(bucket, prefix, collection, region=region)

0 comments on commit 951d029

Please sign in to comment.