diff --git a/README.md b/README.md index b462ff2..cebb00d 100644 --- a/README.md +++ b/README.md @@ -93,6 +93,7 @@ OPTIONAL: health check command. The container is considered unhealthy if the mtime of the file is old - where 'old' is configurable in the service container. If this variable is not set the path defaults to '/tmp/health.txt'. +* `MAX_DOWNLOAD_RETRIES`: Number of times to retry HTTP download calls that fail due to transient errors. OPTIONAL -- Use with CAUTION: diff --git a/dev-requirements.txt b/dev-requirements.txt index 95fc005..a3cf62e 100644 --- a/dev-requirements.txt +++ b/dev-requirements.txt @@ -9,6 +9,7 @@ pytest ~= 6.2 pytest-cov ~=2.11 pytest-mock ~=3.5 python-language-server ~= 0.35 -responses ~= 0.12 +# change this to an official release once the code in this commit is released (https://github.com/getsentry/responses/releases) +responses @ git+https://github.com/getsentry/responses.git@bdc5eff7169a6fba730f2f8427d5ec5a817b1ad5 safety ~= 1.8 pycodestyle ~= 2.7.0 diff --git a/harmony/exceptions.py b/harmony/exceptions.py index b1b2243..6190982 100644 --- a/harmony/exceptions.py +++ b/harmony/exceptions.py @@ -26,3 +26,17 @@ class ForbiddenException(HarmonyException): def __init__(self, message=None): super().__init__(message, 'Forbidden') + + +class ServerException(HarmonyException): + """Class for throwing an exception indicating the download failed due to a generic 500 internal server error """ + + def __init__(self, message=None): + super().__init__(message, 'Server') + + +class TransientException(HarmonyException): + """Class for throwing an exception indicating the download failed due to a transient HTTP error (e.g. 502) """ + + def __init__(self, message=None): + super().__init__(message, 'Transient') diff --git a/harmony/http.py b/harmony/http.py index e7a3a4a..62845e7 100644 --- a/harmony/http.py +++ b/harmony/http.py @@ -19,9 +19,11 @@ import re import requests +from requests.adapters import HTTPAdapter +from urllib3.util.retry import Retry from harmony.earthdata import EarthdataAuth, EarthdataSession -from harmony.exceptions import ForbiddenException +from harmony.exceptions import ServerException, ForbiddenException, TransientException from harmony.logging import build_logger # Timeout in seconds. Per requests docs, this is not a time limit on @@ -32,6 +34,10 @@ # https://2.python-requests.org/en/master/user/quickstart/#timeouts TIMEOUT = 60 +# Error codes for which the retry adapter will retry failed requests. +# Only requests sessions with a mounted retry adapter will exhibit retry behavior. +RETRY_ERROR_CODES = (408, 502, 503, 504) + def is_http(url: str) -> bool: """Predicate to determine if the url is an http endpoint. @@ -67,6 +73,82 @@ def localhost_url(url, local_hostname): return url.replace('localhost', local_hostname) +def _mount_retry(session, total_retries, backoff_factor=2): + """ + Instantiates a retry adapter (with exponential backoff) and mounts it to the requests session. + See _retry_adapter function for backoff algo details. + + Parameters + ---------- + session : requests.Session + The session that will have a retry adapter mounted to it. + total_retries: int + Upper limit on the number of times to retry the request + backoff_factor: float + Factor used to determine backoff/sleep time between executions + + Returns + ------- + The requests.Session + """ + if total_retries < 1: + return session + adapter = _retry_adapter(total_retries, backoff_factor) + session.mount('http://', adapter) + session.mount('https://', adapter) + return session + + +def _retry_adapter(total_retries, backoff_factor=2): + """ + HTTP adapter for retrying (with exponential backoff) failed requests that have returned a status code + indicating a temporary error. + + backoff = {backoff factor} * (2 ** ({retry number} - 1)) + where {retry number} = 1, 2, 3, ..., total_retries + + With a backoff_factor of 5, the total sleep seconds between executions will be + [0, 10, 20, 40, ...]. There is always 0 seconds before the first retry. + 120 seconds is the maximum backoff. + + Parameters + ---------- + total_retries: int + Upper limit on the number of times to retry the request + backoff_factor: float + Factor used to determine backoff/sleep time between executions + + Returns + ------- + The urllib3 retry adapter + """ + retry = Retry( + total=total_retries, + backoff_factor=backoff_factor, + status_forcelist=RETRY_ERROR_CODES, + raise_on_redirect=False, + raise_on_status=False, + allowed_methods=False) + return HTTPAdapter(max_retries=retry) + + +def _log_retry_history(logger, response): + """ + Tries to log the error responses received while retrying. + + Parameters + ---------- + logger : logging.Logger + The logger to use. + response: The requests response + """ + try: + for history in response.raw.retries.history: + logger.info(f'Retry history: url={history.url}, error={history.error}, status={history.status}') + except Exception: + return + + def _is_eula_error(body: str) -> bool: """ Tries to determine if the exception is due to a EULA that the user needs to @@ -106,7 +188,7 @@ def _eula_error_message(body: str) -> str: @lru_cache(maxsize=128) -def _valid(oauth_host: str, oauth_client_id: str, access_token: str) -> bool: +def _valid(oauth_host: str, oauth_client_id: str, access_token: str, total_retries: int) -> bool: """ Validates the user access token with Earthdata Login. @@ -115,18 +197,21 @@ def _valid(oauth_host: str, oauth_client_id: str, access_token: str) -> bool: oauth_host: The Earthdata Login hostname oauth_client_id: The EDL application's client id access_token: The user's access token to validate + total_retries: int + Upper limit on the number of times to retry the request Returns ------- Boolean indicating a valid or invalid user access token """ url = f'{oauth_host}/oauth/tokens/user?token={access_token}&client_id={oauth_client_id}' - response = requests.post(url, timeout=TIMEOUT) + with _mount_retry(requests.Session(), total_retries) as session: + response = session.post(url, timeout=TIMEOUT) - if response.ok: - return True + if response.ok: + return True - raise Exception(response.json()) + raise Exception(response.json()) @lru_cache(maxsize=128) @@ -135,7 +220,7 @@ def _earthdata_session(): return EarthdataSession() -def _download(config, url: str, access_token: str, data, user_agent=None, **kwargs_download_agent): +def _download(config, url: str, access_token: str, data, total_retries: int, user_agent=None, **kwargs_download_agent): """Implements the download functionality. Using the EarthdataSession and EarthdataAuth extensions to the @@ -157,6 +242,8 @@ def _download(config, url: str, access_token: str, data, user_agent=None, **kwar encoded to a query string containing a series of `key=value` pairs, separated by ampersands. If None (the default), the request will be sent with an HTTP GET request. + total_retries: int + Upper limit on the number of times to retry the request user_agent : str The user agent that is requesting the download. E.g. harmony/0.0.0 (harmony-sit) harmony-service-lib/4.0 (gdal-subsetter) @@ -173,7 +260,7 @@ def _download(config, url: str, access_token: str, data, user_agent=None, **kwar if user_agent is not None: headers['user-agent'] = user_agent auth = EarthdataAuth(config.oauth_uid, config.oauth_password, access_token) - with _earthdata_session() as session: + with _mount_retry(_earthdata_session(), total_retries) as session: session.auth = auth if data is None: return session.get(url, headers=headers, timeout=TIMEOUT, **kwargs_download_agent) @@ -184,7 +271,7 @@ def _download(config, url: str, access_token: str, data, user_agent=None, **kwar return session.post(url, headers=headers, data=data, timeout=TIMEOUT) -def _download_with_fallback_authn(config, url: str, data, user_agent=None, **kwargs_download_agent): +def _download_with_fallback_authn(config, url: str, data, total_retries: int, user_agent=None, **kwargs_download_agent): """Downloads the given url using Basic authentication as a fallback mechanism should the normal EDL Oauth handshake fail. @@ -204,6 +291,8 @@ def _download_with_fallback_authn(config, url: str, data, user_agent=None, **kwa encoded to a query string containing a series of `key=value` pairs, separated by ampersands. If None (the default), the request will be sent with an HTTP GET request. + total_retries: int + Upper limit on the number of times to retry the request user_agent : str The user agent that is requesting the download. E.g. harmony/0.0.0 (harmony-sit) harmony-service-lib/4.0 (gdal-subsetter) @@ -220,10 +309,12 @@ def _download_with_fallback_authn(config, url: str, data, user_agent=None, **kwa if user_agent is not None: headers['user-agent'] = user_agent auth = requests.auth.HTTPBasicAuth(config.edl_username, config.edl_password) - if data is None: - return requests.get(url, headers=headers, timeout=TIMEOUT, auth=auth, **kwargs_download_agent) - else: - return requests.post(url, headers=headers, data=data, timeout=TIMEOUT, auth=auth) + with _mount_retry(requests.Session(), total_retries) as session: + session.auth = auth + if data is None: + return session.get(url, headers=headers, timeout=TIMEOUT, **kwargs_download_agent) + else: + return session.post(url, headers=headers, data=data, timeout=TIMEOUT) def _log_download_performance(logger, url, duration_ms, file_size): @@ -322,15 +413,17 @@ def download(config, url: str, access_token: str, data, destination_file, elif stream and not isinstance(buffer_size, int): raise Exception(f"In download parameters: buffer_size must be integer when stream={stream}.") - if access_token is not None and _valid(config.oauth_host, config.oauth_client_id, access_token): - response = _download(config, url, access_token, data, user_agent, stream=stream) + if access_token is not None and _valid( + config.oauth_host, config.oauth_client_id, access_token, config.max_download_retries): + response = _download(config, url, access_token, data, config.max_download_retries, user_agent, stream=stream) if response is None or not response.ok: if config.fallback_authn_enabled: msg = ('No valid user access token in request or EDL OAuth authentication failed.' 'Fallback authentication enabled: retrying with Basic auth.') logger.warning(msg) - response = _download_with_fallback_authn(config, url, data, user_agent, stream=stream) + response = _download_with_fallback_authn( + config, url, data, config.max_download_retries, user_agent, stream=stream) if response.ok: if not stream: @@ -358,8 +451,16 @@ def download(config, url: str, access_token: str, data, destination_file, raise ForbiddenException(msg) if response.status_code == 500: - logger.info(f'Unable to download (500) due to: {response.content}') - raise Exception('Unable to download.') + msg = f'Unable to download {url}' + logger.info(f'{msg} (HTTP 500) due to: {response.content}') + raise ServerException(f'{msg} due to an unexpected data server error.') + + if response.status_code in RETRY_ERROR_CODES: + msg = f'Download of {url} failed due to a transient error ' +\ + f'(HTTP {response.status_code}) after multiple retry attempts.' + _log_retry_history(logger, response) + logger.info(msg) + raise TransientException(msg) logger.info(f'Unable to download (unknown error) due to: {response.content}') raise Exception('Unable to download: unknown error.') diff --git a/harmony/util.py b/harmony/util.py index 00d853c..368b7e0 100644 --- a/harmony/util.py +++ b/harmony/util.py @@ -18,7 +18,7 @@ STAGING_PATH: The base path under which staged files should be placed Required when using HTTPS, allowing Earthdata Login auth: - OAUTH_HOST: The Earthdata Login (EDL) environment to connect to + OAUTH_HOST: The Earthdata Login (EDL) environment to connect to OAUTH_CLIENT_ID: The EDL application client id used to acquire an EDL shared access token OAUTH_UID: The EDL application UID used to acquire an EDL shared access token OAUTH_PASSWORD: The EDL application password used to acquire an EDL shared access token @@ -42,11 +42,12 @@ BACKEND_HOST: The hostname of the Harmony backend. Deprecated / unused by this package. Optional: - APP_NAME: A name for the service that will appear in log entries. - ENV: The application environment. One of: dev, test. Used for local development. - TEXT_LOGGER: Whether to log in plaintext or JSON. Default: True (plaintext). - HEALTH_CHECK_PATH: The filesystem path that should be `touch`ed to indicate the service is - alive. + APP_NAME: A name for the service that will appear in log entries. + ENV: The application environment. One of: dev, test. Used for local development. + TEXT_LOGGER: Whether to log in plaintext or JSON. Default: True (plaintext). + HEALTH_CHECK_PATH: The filesystem path that should be `touch`ed to indicate the service is + alive. + MAX_DOWNLOAD_RETRIES: Number of times to retry HTTP download calls that fail due to transient errors. """ from base64 import b64decode @@ -97,7 +98,8 @@ 'text_logger', 'health_check_path', 'shared_secret_key', - 'user_agent' + 'user_agent', + 'max_download_retries' ]) @@ -112,7 +114,8 @@ def _validated_config(config): 'oauth_password', 'oauth_redirect_uri', 'staging_path', - 'staging_bucket' + 'staging_bucket', + 'max_download_retries' ] unset = [var.upper() for var in required if getattr(config, var) is None] @@ -162,6 +165,10 @@ def bool_envvar(name: str, default: bool) -> bool: value = environ.get(name) return str.lower(value) == 'true' if value is not None else default + def int_envvar(name: str, default: int) -> int: + value = environ.get(name) + return int(value) if value is not None else default + oauth_redirect_uri = str_envvar('OAUTH_REDIRECT_URI', None) if oauth_redirect_uri is not None: oauth_redirect_uri = parse.quote(oauth_redirect_uri) @@ -188,7 +195,8 @@ def bool_envvar(name: str, default: bool) -> bool: text_logger=bool_envvar('TEXT_LOGGER', False), health_check_path=str_envvar('HEALTH_CHECK_PATH', '/tmp/health.txt'), shared_secret_key=str_envvar('SHARED_SECRET_KEY', DEFAULT_SHARED_SECRET_KEY), - user_agent=str_envvar('USER_AGENT', 'harmony (unknown version)') + user_agent=str_envvar('USER_AGENT', 'harmony (unknown version)'), + max_download_retries=int_envvar('MAX_DOWNLOAD_RETRIES', 0) ) if validate: diff --git a/requirements.txt b/requirements.txt index 235df44..b132d46 100644 --- a/requirements.txt +++ b/requirements.txt @@ -4,3 +4,4 @@ pynacl ~= 1.4 pystac ~= 0.5.3 python-json-logger ~= 2.0.1 requests ~= 2.24 +urllib3 ~= 1.26.9 \ No newline at end of file diff --git a/tests/test_http.py b/tests/test_http.py index 0564093..e7ee1e6 100644 --- a/tests/test_http.py +++ b/tests/test_http.py @@ -3,7 +3,7 @@ import os import harmony.http -from harmony.http import (download, is_http, localhost_url) +from harmony.http import (download, is_http, localhost_url, RETRY_ERROR_CODES) from tests.util import config_fixture EDL_URL = 'https://uat.urs.earthdata.nasa.gov' @@ -86,7 +86,7 @@ def test_download_follows_redirect_to_edl_and_adds_auth_headers( edl_redirect_url, getsize_patched): - monkeypatch.setattr(harmony.http, '_valid', lambda a, b, c: True) + monkeypatch.setattr(harmony.http, '_valid', lambda a, b, c, d: True) responses.add( responses.GET, resource_server_granule_url, @@ -133,7 +133,7 @@ def test_download_follows_redirect_to_resource_server_with_code( headers=[('Location', resource_server_redirect_url)] ) - monkeypatch.setattr(harmony.http, '_valid', lambda a, b, c: True) + monkeypatch.setattr(harmony.http, '_valid', lambda a, b, c, d: True) responses.add( responses.GET, resource_server_redirect_url, @@ -161,7 +161,7 @@ def test_resource_server_redirects_to_granule_url( resource_server_granule_url, getsize_patched): - monkeypatch.setattr(harmony.http, '_valid', lambda a, b, c: True) + monkeypatch.setattr(harmony.http, '_valid', lambda a, b, c, d: True) responses.add( responses.GET, resource_server_redirect_url, @@ -273,7 +273,7 @@ def test_when_given_a_url_and_data_it_downloads_with_query_parameters( resource_server_granule_url, getsize_patched): - monkeypatch.setattr(harmony.http, '_valid', lambda a, b, c: True) + monkeypatch.setattr(harmony.http, '_valid', lambda a, b, c, d: True) responses.add( responses.POST, resource_server_granule_url, @@ -299,7 +299,7 @@ def test_when_authn_succeeds_it_writes_to_provided_file( response_body_from_granule_url, getsize_patched): - monkeypatch.setattr(harmony.http, '_valid', lambda a, b, c: True) + monkeypatch.setattr(harmony.http, '_valid', lambda a, b, c, d: True) responses.add( responses.GET, resource_server_granule_url, @@ -325,7 +325,7 @@ def test_when_given_an_access_token_and_error_occurs_it_falls_back_to_basic_auth response_body_from_granule_url, getsize_patched): - monkeypatch.setattr(harmony.http, '_valid', lambda a, b, c: True) + monkeypatch.setattr(harmony.http, '_valid', lambda a, b, c, d: True) client_id = faker.password(length=22, special_chars=False) access_token = faker.password(length=42, special_chars=False) cfg = config_fixture(oauth_client_id=client_id, fallback_authn_enabled=True) @@ -359,7 +359,7 @@ def test_when_given_an_access_token_and_error_occurs_it_does_not_fall_back_to_ba faker, resource_server_granule_url): - monkeypatch.setattr(harmony.http, '_valid', lambda a, b, c: True) + monkeypatch.setattr(harmony.http, '_valid', lambda a, b, c, d: True) client_id = faker.password(length=22, special_chars=False) access_token = faker.password(length=42, special_chars=False) cfg = config_fixture(oauth_client_id=client_id, fallback_authn_enabled=False) @@ -413,7 +413,7 @@ def test_download_unknown_error_exception_if_all_else_fails( faker, resource_server_granule_url): - monkeypatch.setattr(harmony.http, '_valid', lambda a, b, c: True) + monkeypatch.setattr(harmony.http, '_valid', lambda a, b, c, d: True) client_id = faker.password(length=22, special_chars=False) access_token = faker.password(length=42, special_chars=False) cfg = config_fixture(oauth_client_id=client_id, fallback_authn_enabled=False) @@ -485,7 +485,7 @@ def test_user_agent_is_passed_to_request_headers_when_using_edl_auth( resource_server_granule_url, getsize_patched): - monkeypatch.setattr(harmony.http, '_valid', lambda a, b, c: True) + monkeypatch.setattr(harmony.http, '_valid', lambda a, b, c, d: True) responses.add( responses.GET, resource_server_granule_url, @@ -508,7 +508,7 @@ def test_user_agent_is_passed_to_request_headers_when_using_edl_auth_and_post_pa resource_server_granule_url, getsize_patched): - monkeypatch.setattr(harmony.http, '_valid', lambda a, b, c: True) + monkeypatch.setattr(harmony.http, '_valid', lambda a, b, c, d: True) responses.add( responses.POST, resource_server_granule_url, @@ -524,8 +524,72 @@ def test_user_agent_is_passed_to_request_headers_when_using_edl_auth_and_post_pa assert 'User-Agent' in responses.calls[0].request.headers assert user_agent in responses.calls[0].request.headers['User-Agent'] -@pytest.mark.skip(reason='Feature request from EDL team: HARMONY-733') -def test_download_retries_correctly(): - # TODO: Feature request from EDL team to handle EDL failures - # https://bugs.earthdata.nasa.gov/browse/HARMONY-733 - pass +@responses.activate(registry=responses.registries.OrderedRegistry) +@pytest.mark.parametrize('error_code', RETRY_ERROR_CODES) +def test_retries_on_temporary_errors_edl_auth( + monkeypatch, + mocker, + access_token, + resource_server_granule_url, + getsize_patched, + error_code): + monkeypatch.setattr(harmony.http, '_valid', lambda a, b, c, d: True) + rsp1 = responses.get(resource_server_granule_url, body="Error", status=error_code) + rsp2 = responses.get(resource_server_granule_url, body="Error", status=error_code) + rsp3 = responses.get(resource_server_granule_url, body="OK", status=200) + + destination_file = mocker.Mock() + cfg = config_fixture(max_download_retries=5) + + response = download(cfg, resource_server_granule_url, access_token, None, destination_file) + + assert response.status_code == 200 + assert rsp1.call_count == 1 + assert rsp2.call_count == 1 + assert rsp3.call_count == 1 + +@responses.activate(registry=responses.registries.OrderedRegistry) +@pytest.mark.parametrize('error_code', RETRY_ERROR_CODES) +def test_retries_on_temporary_errors_basic_auth( + monkeypatch, + mocker, + faker, + access_token, + resource_server_granule_url, + getsize_patched, + error_code): + rsp1 = responses.get(resource_server_granule_url, body="Error", status=error_code) + rsp2 = responses.get(resource_server_granule_url, body="Error", status=error_code) + rsp3 = responses.get(resource_server_granule_url, body="OK", status=200) + + destination_file = mocker.Mock() + client_id = faker.password(length=22, special_chars=False) + cfg = config_fixture(oauth_client_id=client_id, fallback_authn_enabled=True, max_download_retries=5) + + response = download(cfg, resource_server_granule_url, access_token, None, destination_file) + + assert response.status_code == 200 + assert rsp1.call_count == 1 + assert rsp2.call_count == 1 + assert rsp3.call_count == 1 + +@responses.activate(registry=responses.registries.OrderedRegistry) +@pytest.mark.parametrize('error_code', RETRY_ERROR_CODES) +def test_retries_on_temporary_errors_until_limit( + monkeypatch, + mocker, + access_token, + resource_server_granule_url, + getsize_patched, + error_code): + monkeypatch.setattr(harmony.http, '_valid', lambda a, b, c, d: True) + for i in range(0, 5): + responses.get(resource_server_granule_url, body="Error", status=error_code) + + destination_file = mocker.Mock() + cfg = config_fixture(max_download_retries=5) + + with pytest.raises(Exception) as e: + download(cfg, resource_server_granule_url, access_token, None, destination_file) + assert e.type == Exception + assert f'failed due to a transient error (HTTP {error_code}) after multiple retry attempts' in e.value.message diff --git a/tests/test_util_download.py b/tests/test_util_download.py index 512584e..4b3ccb7 100644 --- a/tests/test_util_download.py +++ b/tests/test_util_download.py @@ -3,7 +3,7 @@ import pytest import responses -from harmony.exceptions import ForbiddenException +from harmony.exceptions import ForbiddenException, ServerException from harmony import util from tests.util import config_fixture @@ -178,5 +178,5 @@ def test_when_the_url_returns_a_500_it_does_not_raise_a_forbidden_exception_and_ with mock.patch('builtins.open', mock.mock_open()): with pytest.raises(Exception) as e: util.download(url, '/tmp', access_token=access_token, cfg=config) - assert e.type != ForbiddenException and e.type == Exception + assert e.type != ForbiddenException and e.type == ServerException assert len(responses.calls) == 2 diff --git a/tests/util.py b/tests/util.py index 5928e6d..afb293e 100644 --- a/tests/util.py +++ b/tests/util.py @@ -78,7 +78,8 @@ def config_fixture(fallback_authn_enabled=False, oauth_client_id=None, user_agent=None, app_name=None, - text_logger=False): + text_logger=False, + max_download_retries=5): c = util.config(validate=False) return util.Config( # Override @@ -91,6 +92,7 @@ def config_fixture(fallback_authn_enabled=False, oauth_client_id=oauth_client_id, app_name=app_name, text_logger=text_logger, + max_download_retries=max_download_retries, # Default env=c.env, oauth_host=c.oauth_host,