Skip to content

Commit

Permalink
Merge pull request #660 Initial implementation of standard retry
Browse files Browse the repository at this point in the history
  • Loading branch information
DyfanJones authored Sep 1, 2023
2 parents 5bc1dad + c3b67e3 commit ac7f5b2
Show file tree
Hide file tree
Showing 16 changed files with 522 additions and 299 deletions.
1 change: 0 additions & 1 deletion make.paws/R/paws_helper_fun.R
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
"endpoint_resolver",
"enforce_should_retry_check",
"disable_ssl",
"max_retries",
"retryer",
"disable_param_validation",
"disable_compute_checksums",
Expand Down
1 change: 1 addition & 0 deletions make.paws/inst/templates/service_parameter_helper.R
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
#' @param close_connection Immediately close all HTTP connections.
#' @param connect_timeout The time in seconds till a timeout exception is thrown
#' when attempting to make a connection. The default is 60 seconds.
#' @param max_retries Max number of retries call AWS API (default set to 3).
#' @param s3_force_path_style Set this to `true` to force the request to use path-style
#' addressing, i.e. `http://s3.amazonaws.com/BUCKET/KEY`.
#' @param sts_regional_endpoint Set sts regional endpoint resolver to regional or
Expand Down
3 changes: 2 additions & 1 deletion paws.common/DESCRIPTION
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ Collate:
'service.R'
'custom_dynamodb.R'
'custom_rds.R'
'tags.R'
'xmlutil.R'
'stream.R'
'custom_s3.R'
Expand All @@ -74,9 +75,9 @@ Collate:
'paginate.R'
'populate.R'
'populateutil.R'
'tags.R'
'queryutil.R'
'request.R'
'retry.R'
'service_parameter_helper.R'
'signer_v4.R'
'signer_s3.R'
Expand Down
3 changes: 2 additions & 1 deletion paws.common/NEWS.md
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
# paws.common 0.5.9
* use known interface when parsing xml (@619) improving performance by 3-6x. thanks to @mgirlich for raising, implementing initial method and testing.
* use known interface when parsing xml (@619) improving performance by 3-6x. Thanks to @mgirlich for raising, implementing initial method and testing.
* add expiration parameter to creds
* add signature_version to config
* add the ability to paginate paws methods (#30)
* overwrite file destination when writing to disk. This mimics python's boto3 sdk behaviour.
* add standard retry handler (#520). Thanks to @wlandau for testing.

# paws.common 0.5.8
* fix mismatch apparent method as.list.struct (#634)
Expand Down
2 changes: 1 addition & 1 deletion paws.common/R/client.R
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ Config <- struct(
region = "",
disable_ssl = FALSE,
close_connection = FALSE,
max_retries = -1,
max_retries = 3,
connect_timeout = 60,
retryer = NULL,
disable_param_validation = FALSE,
Expand Down
42 changes: 2 additions & 40 deletions paws.common/R/paginate.R
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
#' in order to retrieve the next page of results.
#'
#' @param Operation The operation for example an s3 operation: \code{svc$list_buckets()}
#' @param MaxRetries Max number of retries call AWS API.
#' @param PageSize The size of each page.
#' @param MaxItems Limits the maximum number of total returned items returned while paginating.
#' @param StartingToken Can be used to modify the starting marker or token of a paginator.
Expand All @@ -38,7 +37,6 @@
#' @name paginate
#' @export
paginate <- function(Operation,
MaxRetries = 5,
PageSize = NULL,
MaxItems = NULL,
StartingToken = NULL) {
Expand All @@ -59,7 +57,7 @@ paginate <- function(Operation,
no_items <- 0
result <- list()
while (!identical(fn[[paginator$input_token[[1]]]], character(0))) {
resp <- retry_api_call(eval(fn), retries = MaxRetries)
resp <- eval(fn)
new_tokens <- get_tokens(resp, paginator$output_token)
for (i in seq_along(new_tokens)) {
fn[[paginator$input_token[[i]]]] <- new_tokens[[i]]
Expand Down Expand Up @@ -87,7 +85,6 @@ paginate <- function(Operation,
paginate_lapply <- function(Operation,
FUN,
...,
MaxRetries = 5,
PageSize = NULL,
MaxItems = NULL,
StartingToken = NULL) {
Expand All @@ -109,7 +106,6 @@ paginate_lapply <- function(Operation,
paginator = fn_update$paginator,
FUN = FUN,
...,
MaxRetries = MaxRetries,
MaxItems = MaxItems
)
return(result)
Expand All @@ -121,7 +117,6 @@ paginate_sapply <- function(Operation,
FUN,
...,
simplify = TRUE,
MaxRetries = 5,
PageSize = NULL,
MaxItems = NULL,
StartingToken = NULL) {
Expand All @@ -143,7 +138,6 @@ paginate_sapply <- function(Operation,
paginator = fn_update$paginator,
FUN = FUN,
...,
MaxRetries = MaxRetries,
MaxItems = MaxItems
)

Expand Down Expand Up @@ -240,13 +234,12 @@ paginate_xapply <- function(
paginator,
FUN,
...,
MaxRetries = 5,
MaxItems = NULL) {
primary_result_key <- paginator$result_key[[1]]
no_items <- 0
result <- list()
while (!identical(fn[[paginator$input_token[[1]]]], character(0))) {
resp <- retry_api_call(eval(fn), retries = MaxRetries)
resp <- eval(fn)
new_tokens <- get_tokens(resp, paginator$output_token)
for (i in seq_along(new_tokens)) {
fn[[paginator$input_token[[i]]]] <- new_tokens[[i]]
Expand Down Expand Up @@ -333,34 +326,3 @@ get_token_len <- function(resp, token) {
}
)
}

# See https://docs.aws.amazon.com/sdkref/latest/guide/feature-retry-behavior.html
retry_api_call <- function(expr, retries) {
for (i in seq_len(retries + 1)) {
tryCatch(
{
return(eval.parent(substitute(expr)))
},
error = function(err) {
msg <- err$message

# Only Retry rate exceeded errors.
if (grepl("rate exceeded", msg, ignore.case = T)) {
exp_back_off(err, i, retries)
} else {
stop(err)
}
}
)
}
}

# Retry with exponential backoff with jitter
exp_back_off <- function(error, i, retries) {
if (i == (retries + 1)) {
stop(error)
}
time <- min(runif(1) * 2^i, 20)
log_error("Request failed. Retrying in %s seconds...", time)
Sys.sleep(time)
}
9 changes: 2 additions & 7 deletions paws.common/R/request.R
Original file line number Diff line number Diff line change
Expand Up @@ -165,16 +165,11 @@ send_request <- function(request) {
request <- unmarshal_meta(request)
request <- validate_response(request)

if (!is.null(request$error)) {
request <- unmarshal_error(request)
stop(aws_error(request$error))
}
request <- retry(request)

request <- unmarshal(request)

out <- get_request_output(request)

return(out)
return(request[["data"]])
}

#-------------------------------------------------------------------------------
Expand Down
98 changes: 98 additions & 0 deletions paws.common/R/retry.R
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
#' @include util.R

# Retry attempts for an expanded list of errors/exceptions:
retryable_codes <- c(
# Transient errors/exceptions
"RequestTimeout",
"RequestTimeoutException",
"PriorRequestNotComplete",
"ConnectionError",
"HTTPClientError",

# Service-side throttling/limit errors and exceptions
"Throttling",
"ThrottlingException",
"ThrottledException",
"RequestThrottledException",
"TooManyRequestsException",
"ProvisionedThroughputExceededException",
"TransactionInProgressException",
"RequestLimitExceeded",
"BandwidthLimitExceeded",
"LimitExceededException",
"RequestThrottled",
"SlowDown",
"EC2ThrottledException"
)

# https://github.com/boto/boto3/blob/0b82bf9843ad6d350b48442c47f4a484a886ee3f/docs/source/guide/retries.rst#standard-retry-mode
standard_retry_handler <- function(request) {
# Handle first api call
# exit if no error
if (is.null(request[["error"]])) {
return(request)
}

request <- unmarshal_error(request)
error <- aws_error(request[["error"]])
retries <- request[["config"]][["max_retries"]]
exit_retries <- retries + 1

# If error is not retryable raise error
if (!check_if_retryable(error) || retries == 0) {
stop(error)
} else {
# initial backoff
exp_back_off(error, 1, exit_retries)
}

# retry api call
for (i in seq.int(2, exit_retries)) {
tryCatch({
request <- sign(request)
if (!is.null(request[["error"]])) {
stop(aws_error(request[["error"]]))
}
request <- send(request)
request <- unmarshal_meta(request)
request <- validate_response(request)

if (!is.null(request[["error"]])) {
request <- unmarshal_error(request)
stop(aws_error(request[["error"]]))
}
return(request)
}, paws_error = function(error) {
if (check_if_retryable(error)) {
exp_back_off(error, i, exit_retries)
} else {
stop(error)
}
})
}
}

check_if_retryable <- function(error) {
error_code <- error[["error_response"]][["Code"]] %||% error[["error_response"]][["__type"]]
status_code <- error[["status_code"]]
retryable <- FALSE

if (!is_empty(error_code) && error_code %in% retryable_codes) {
retryable <- TRUE
# Retry attempts on nondescriptive, transient error codes. Specifically, these HTTP status codes: 500, 502, 503, 504.
} else if (!is_empty(status_code) && status_code %in% c(500, 502, 503, 504)) {
retryable <- TRUE
}
return(retryable)
}

# Any retry attempt will include an exponential backoff by a base factor of 2 for a maximum backoff time of 20 seconds.
# Retry with exponential backoff with jitter
exp_back_off <- function(error, i, retries) {
if (i == retries) {
stop(error)
}
time <- min(runif(1) * 2^i, 20)
log_error("Request failed. Retrying in %.3f seconds...", time)
Sys.sleep(time)
}
3 changes: 3 additions & 0 deletions paws.common/R/service.R
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,9 @@ new_handlers <- function(protocol, signer) {
),
unmarshal_error = HandlerList(
handler(protocol, "unmarshal_error")
),
retry = HandlerList(
standard_retry_handler
)
)
return(handlers)
Expand Down
61 changes: 29 additions & 32 deletions paws.common/R/service_parameter_helper.R
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
#' @param close_connection Immediately close all HTTP connections.
#' @param connect_timeout The time in seconds till a timeout exception is thrown
#' when attempting to make a connection. The default is 60 seconds.
#' @param max_retries Max number of retries call AWS API (default set to 3).
#' @param s3_force_path_style Set this to `true` to force the request to use path-style
#' addressing, i.e. `http://s3.amazonaws.com/BUCKET/KEY`.
#' @param sts_regional_endpoint Set sts regional endpoint resolver to regional or
Expand Down Expand Up @@ -63,49 +64,45 @@
#'
#' # set service parameter access_key_id and secret_access_key using using lists
#' config(
#' credentials = list(
#' creds = list(
#' access_key_id = "dummy",
#' secret_access_key = "secret"
#' credentials = list(
#' creds = list(
#' access_key_id = "dummy",
#' secret_access_key = "secret"
#' )
#' )
#' )
#' )
#'
#' @name set_service_parameter
#' @export
config <- function(
credentials = list(creds = list(
access_key_id = "",
secret_access_key = "", session_token = "", access_token = "",
expiration = Inf
), profile = "", anonymous = FALSE), endpoint = "",
region = "", close_connection = FALSE, connect_timeout = 60,
s3_force_path_style = FALSE, sts_regional_endpoint = "",
signature_version = "") {
.args <- as.list(environment(), all.names = TRUE)
class(.args) <- "struct"
return(.args)
config <- function (credentials = list(creds = list(access_key_id = "",
secret_access_key = "", session_token = "", access_token = "",
expiration = Inf), profile = "", anonymous = FALSE), endpoint = "",
region = "", close_connection = FALSE, max_retries = 3, connect_timeout = 60,
s3_force_path_style = FALSE, sts_regional_endpoint = "",
signature_version = "")
{
.args <- as.list(environment(), all.names = TRUE)
class(.args) <- "struct"
return(.args)
}

#' @rdname set_service_parameter
#' @export
credentials <- function(
creds = list(
access_key_id = "", secret_access_key = "",
session_token = "", access_token = "", expiration = Inf
),
profile = "", anonymous = FALSE) {
.args <- as.list(environment(), all.names = TRUE)
class(.args) <- "struct"
return(.args)
credentials <- function (creds = list(access_key_id = "", secret_access_key = "",
session_token = "", access_token = "", expiration = Inf),
profile = "", anonymous = FALSE)
{
.args <- as.list(environment(), all.names = TRUE)
class(.args) <- "struct"
return(.args)
}

#' @rdname set_service_parameter
#' @export
creds <- function(
access_key_id = "", secret_access_key = "", session_token = "",
access_token = "", expiration = Inf) {
.args <- as.list(environment(), all.names = TRUE)
class(.args) <- "struct"
return(.args)
creds <- function (access_key_id = "", secret_access_key = "", session_token = "",
access_token = "", expiration = Inf)
{
.args <- as.list(environment(), all.names = TRUE)
class(.args) <- "struct"
return(.args)
}
Loading

0 comments on commit ac7f5b2

Please sign in to comment.