Skip to content

Commit

Permalink
Merge pull request #26 from nasa/harmony-1125
Browse files Browse the repository at this point in the history
Harmony 1125 - Automatically retry temporary request failures
  • Loading branch information
vinnyinverso authored Jun 6, 2022
2 parents b9ab8d2 + eb9ccd3 commit 2e4d166
Show file tree
Hide file tree
Showing 9 changed files with 239 additions and 47 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ OPTIONAL:
health check command. The container is considered unhealthy if the mtime of the file is old -
where 'old' is configurable in the service container. If this variable is not set the path
defaults to '/tmp/health.txt'.
* `MAX_DOWNLOAD_RETRIES`: Number of times to retry HTTP download calls that fail due to transient errors.

OPTIONAL -- Use with CAUTION:

Expand Down
3 changes: 2 additions & 1 deletion dev-requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ pytest ~= 6.2
pytest-cov ~=2.11
pytest-mock ~=3.5
python-language-server ~= 0.35
responses ~= 0.12
# change this to an official release once the code in this commit is released (https://github.com/getsentry/responses/releases)
responses @ git+https://github.com/getsentry/responses.git@bdc5eff7169a6fba730f2f8427d5ec5a817b1ad5
safety ~= 1.8
pycodestyle ~= 2.7.0
14 changes: 14 additions & 0 deletions harmony/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,3 +26,17 @@ class ForbiddenException(HarmonyException):

def __init__(self, message=None):
super().__init__(message, 'Forbidden')


class ServerException(HarmonyException):
"""Class for throwing an exception indicating the download failed due to a generic 500 internal server error """

def __init__(self, message=None):
super().__init__(message, 'Server')


class TransientException(HarmonyException):
"""Class for throwing an exception indicating the download failed due to a transient HTTP error (e.g. 502) """

def __init__(self, message=None):
super().__init__(message, 'Transient')
137 changes: 119 additions & 18 deletions harmony/http.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,11 @@
import re

import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry

from harmony.earthdata import EarthdataAuth, EarthdataSession
from harmony.exceptions import ForbiddenException
from harmony.exceptions import ServerException, ForbiddenException, TransientException
from harmony.logging import build_logger

# Timeout in seconds. Per requests docs, this is not a time limit on
Expand All @@ -32,6 +34,10 @@
# https://2.python-requests.org/en/master/user/quickstart/#timeouts
TIMEOUT = 60

# Error codes for which the retry adapter will retry failed requests.
# Only requests sessions with a mounted retry adapter will exhibit retry behavior.
RETRY_ERROR_CODES = (408, 502, 503, 504)


def is_http(url: str) -> bool:
"""Predicate to determine if the url is an http endpoint.
Expand Down Expand Up @@ -67,6 +73,82 @@ def localhost_url(url, local_hostname):
return url.replace('localhost', local_hostname)


def _mount_retry(session, total_retries, backoff_factor=2):
"""
Instantiates a retry adapter (with exponential backoff) and mounts it to the requests session.
See _retry_adapter function for backoff algo details.
Parameters
----------
session : requests.Session
The session that will have a retry adapter mounted to it.
total_retries: int
Upper limit on the number of times to retry the request
backoff_factor: float
Factor used to determine backoff/sleep time between executions
Returns
-------
The requests.Session
"""
if total_retries < 1:
return session
adapter = _retry_adapter(total_retries, backoff_factor)
session.mount('http://', adapter)
session.mount('https://', adapter)
return session


def _retry_adapter(total_retries, backoff_factor=2):
"""
HTTP adapter for retrying (with exponential backoff) failed requests that have returned a status code
indicating a temporary error.
backoff = {backoff factor} * (2 ** ({retry number} - 1))
where {retry number} = 1, 2, 3, ..., total_retries
With a backoff_factor of 5, the total sleep seconds between executions will be
[0, 10, 20, 40, ...]. There is always 0 seconds before the first retry.
120 seconds is the maximum backoff.
Parameters
----------
total_retries: int
Upper limit on the number of times to retry the request
backoff_factor: float
Factor used to determine backoff/sleep time between executions
Returns
-------
The urllib3 retry adapter
"""
retry = Retry(
total=total_retries,
backoff_factor=backoff_factor,
status_forcelist=RETRY_ERROR_CODES,
raise_on_redirect=False,
raise_on_status=False,
allowed_methods=False)
return HTTPAdapter(max_retries=retry)


def _log_retry_history(logger, response):
"""
Tries to log the error responses received while retrying.
Parameters
----------
logger : logging.Logger
The logger to use.
response: The requests response
"""
try:
for history in response.raw.retries.history:
logger.info(f'Retry history: url={history.url}, error={history.error}, status={history.status}')
except Exception:
return


def _is_eula_error(body: str) -> bool:
"""
Tries to determine if the exception is due to a EULA that the user needs to
Expand Down Expand Up @@ -106,7 +188,7 @@ def _eula_error_message(body: str) -> str:


@lru_cache(maxsize=128)
def _valid(oauth_host: str, oauth_client_id: str, access_token: str) -> bool:
def _valid(oauth_host: str, oauth_client_id: str, access_token: str, total_retries: int) -> bool:
"""
Validates the user access token with Earthdata Login.
Expand All @@ -115,18 +197,21 @@ def _valid(oauth_host: str, oauth_client_id: str, access_token: str) -> bool:
oauth_host: The Earthdata Login hostname
oauth_client_id: The EDL application's client id
access_token: The user's access token to validate
total_retries: int
Upper limit on the number of times to retry the request
Returns
-------
Boolean indicating a valid or invalid user access token
"""
url = f'{oauth_host}/oauth/tokens/user?token={access_token}&client_id={oauth_client_id}'
response = requests.post(url, timeout=TIMEOUT)
with _mount_retry(requests.Session(), total_retries) as session:
response = session.post(url, timeout=TIMEOUT)

if response.ok:
return True
if response.ok:
return True

raise Exception(response.json())
raise Exception(response.json())


@lru_cache(maxsize=128)
Expand All @@ -135,7 +220,7 @@ def _earthdata_session():
return EarthdataSession()


def _download(config, url: str, access_token: str, data, user_agent=None, **kwargs_download_agent):
def _download(config, url: str, access_token: str, data, total_retries: int, user_agent=None, **kwargs_download_agent):
"""Implements the download functionality.
Using the EarthdataSession and EarthdataAuth extensions to the
Expand All @@ -157,6 +242,8 @@ def _download(config, url: str, access_token: str, data, user_agent=None, **kwar
encoded to a query string containing a series of `key=value`
pairs, separated by ampersands. If None (the default), the
request will be sent with an HTTP GET request.
total_retries: int
Upper limit on the number of times to retry the request
user_agent : str
The user agent that is requesting the download.
E.g. harmony/0.0.0 (harmony-sit) harmony-service-lib/4.0 (gdal-subsetter)
Expand All @@ -173,7 +260,7 @@ def _download(config, url: str, access_token: str, data, user_agent=None, **kwar
if user_agent is not None:
headers['user-agent'] = user_agent
auth = EarthdataAuth(config.oauth_uid, config.oauth_password, access_token)
with _earthdata_session() as session:
with _mount_retry(_earthdata_session(), total_retries) as session:
session.auth = auth
if data is None:
return session.get(url, headers=headers, timeout=TIMEOUT, **kwargs_download_agent)
Expand All @@ -184,7 +271,7 @@ def _download(config, url: str, access_token: str, data, user_agent=None, **kwar
return session.post(url, headers=headers, data=data, timeout=TIMEOUT)


def _download_with_fallback_authn(config, url: str, data, user_agent=None, **kwargs_download_agent):
def _download_with_fallback_authn(config, url: str, data, total_retries: int, user_agent=None, **kwargs_download_agent):
"""Downloads the given url using Basic authentication as a fallback
mechanism should the normal EDL Oauth handshake fail.
Expand All @@ -204,6 +291,8 @@ def _download_with_fallback_authn(config, url: str, data, user_agent=None, **kwa
encoded to a query string containing a series of `key=value`
pairs, separated by ampersands. If None (the default), the
request will be sent with an HTTP GET request.
total_retries: int
Upper limit on the number of times to retry the request
user_agent : str
The user agent that is requesting the download.
E.g. harmony/0.0.0 (harmony-sit) harmony-service-lib/4.0 (gdal-subsetter)
Expand All @@ -220,10 +309,12 @@ def _download_with_fallback_authn(config, url: str, data, user_agent=None, **kwa
if user_agent is not None:
headers['user-agent'] = user_agent
auth = requests.auth.HTTPBasicAuth(config.edl_username, config.edl_password)
if data is None:
return requests.get(url, headers=headers, timeout=TIMEOUT, auth=auth, **kwargs_download_agent)
else:
return requests.post(url, headers=headers, data=data, timeout=TIMEOUT, auth=auth)
with _mount_retry(requests.Session(), total_retries) as session:
session.auth = auth
if data is None:
return session.get(url, headers=headers, timeout=TIMEOUT, **kwargs_download_agent)
else:
return session.post(url, headers=headers, data=data, timeout=TIMEOUT)


def _log_download_performance(logger, url, duration_ms, file_size):
Expand Down Expand Up @@ -322,15 +413,17 @@ def download(config, url: str, access_token: str, data, destination_file,
elif stream and not isinstance(buffer_size, int):
raise Exception(f"In download parameters: buffer_size must be integer when stream={stream}.")

if access_token is not None and _valid(config.oauth_host, config.oauth_client_id, access_token):
response = _download(config, url, access_token, data, user_agent, stream=stream)
if access_token is not None and _valid(
config.oauth_host, config.oauth_client_id, access_token, config.max_download_retries):
response = _download(config, url, access_token, data, config.max_download_retries, user_agent, stream=stream)

if response is None or not response.ok:
if config.fallback_authn_enabled:
msg = ('No valid user access token in request or EDL OAuth authentication failed.'
'Fallback authentication enabled: retrying with Basic auth.')
logger.warning(msg)
response = _download_with_fallback_authn(config, url, data, user_agent, stream=stream)
response = _download_with_fallback_authn(
config, url, data, config.max_download_retries, user_agent, stream=stream)

if response.ok:
if not stream:
Expand Down Expand Up @@ -358,8 +451,16 @@ def download(config, url: str, access_token: str, data, destination_file,
raise ForbiddenException(msg)

if response.status_code == 500:
logger.info(f'Unable to download (500) due to: {response.content}')
raise Exception('Unable to download.')
msg = f'Unable to download {url}'
logger.info(f'{msg} (HTTP 500) due to: {response.content}')
raise ServerException(f'{msg} due to an unexpected data server error.')

if response.status_code in RETRY_ERROR_CODES:
msg = f'Download of {url} failed due to a transient error ' +\
f'(HTTP {response.status_code}) after multiple retry attempts.'
_log_retry_history(logger, response)
logger.info(msg)
raise TransientException(msg)

logger.info(f'Unable to download (unknown error) due to: {response.content}')
raise Exception('Unable to download: unknown error.')
26 changes: 17 additions & 9 deletions harmony/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
STAGING_PATH: The base path under which staged files should be placed
Required when using HTTPS, allowing Earthdata Login auth:
OAUTH_HOST: The Earthdata Login (EDL) environment to connect to
OAUTH_HOST: The Earthdata Login (EDL) environment to connect to
OAUTH_CLIENT_ID: The EDL application client id used to acquire an EDL shared access token
OAUTH_UID: The EDL application UID used to acquire an EDL shared access token
OAUTH_PASSWORD: The EDL application password used to acquire an EDL shared access token
Expand All @@ -42,11 +42,12 @@
BACKEND_HOST: The hostname of the Harmony backend. Deprecated / unused by this package.
Optional:
APP_NAME: A name for the service that will appear in log entries.
ENV: The application environment. One of: dev, test. Used for local development.
TEXT_LOGGER: Whether to log in plaintext or JSON. Default: True (plaintext).
HEALTH_CHECK_PATH: The filesystem path that should be `touch`ed to indicate the service is
alive.
APP_NAME: A name for the service that will appear in log entries.
ENV: The application environment. One of: dev, test. Used for local development.
TEXT_LOGGER: Whether to log in plaintext or JSON. Default: True (plaintext).
HEALTH_CHECK_PATH: The filesystem path that should be `touch`ed to indicate the service is
alive.
MAX_DOWNLOAD_RETRIES: Number of times to retry HTTP download calls that fail due to transient errors.
"""

from base64 import b64decode
Expand Down Expand Up @@ -97,7 +98,8 @@
'text_logger',
'health_check_path',
'shared_secret_key',
'user_agent'
'user_agent',
'max_download_retries'
])


Expand All @@ -112,7 +114,8 @@ def _validated_config(config):
'oauth_password',
'oauth_redirect_uri',
'staging_path',
'staging_bucket'
'staging_bucket',
'max_download_retries'
]

unset = [var.upper() for var in required if getattr(config, var) is None]
Expand Down Expand Up @@ -162,6 +165,10 @@ def bool_envvar(name: str, default: bool) -> bool:
value = environ.get(name)
return str.lower(value) == 'true' if value is not None else default

def int_envvar(name: str, default: int) -> int:
value = environ.get(name)
return int(value) if value is not None else default

oauth_redirect_uri = str_envvar('OAUTH_REDIRECT_URI', None)
if oauth_redirect_uri is not None:
oauth_redirect_uri = parse.quote(oauth_redirect_uri)
Expand All @@ -188,7 +195,8 @@ def bool_envvar(name: str, default: bool) -> bool:
text_logger=bool_envvar('TEXT_LOGGER', False),
health_check_path=str_envvar('HEALTH_CHECK_PATH', '/tmp/health.txt'),
shared_secret_key=str_envvar('SHARED_SECRET_KEY', DEFAULT_SHARED_SECRET_KEY),
user_agent=str_envvar('USER_AGENT', 'harmony (unknown version)')
user_agent=str_envvar('USER_AGENT', 'harmony (unknown version)'),
max_download_retries=int_envvar('MAX_DOWNLOAD_RETRIES', 0)
)

if validate:
Expand Down
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,4 @@ pynacl ~= 1.4
pystac ~= 0.5.3
python-json-logger ~= 2.0.1
requests ~= 2.24
urllib3 ~= 1.26.9
Loading

0 comments on commit 2e4d166

Please sign in to comment.