diff --git a/docs/paginators.md b/docs/paginators.md new file mode 100644 index 000000000..d4a80a222 --- /dev/null +++ b/docs/paginators.md @@ -0,0 +1,119 @@ +# Paginators + +> Some AWS operations return results that are incomplete and require subsequent requests in order to attain the entire result set. The process of sending subsequent requests to continue where a previous request left off is called pagination. For example, the list_objects operation of Amazon S3 returns up to 1000 objects at a time, and you must send subsequent requests with the appropriate Marker in order to retrieve the next page of results. +(https://boto3.amazonaws.com/v1/documentation/api/latest/guide/paginators.html#paginators) + + +As of `paws v0.4.0+` paginators are supported within `paws`. + +## Basic Usage + +A paginator can be applied to a `paws` operation. `paws` support 3 different methods of paginator (`paginate`, `paginate_lapply`, `paginate_sapply`). + +### `paginate`: +Return all response from the `paws` operation. + +```r +library(paws) + +svc <- s3(region = "us-west-2") + +results <- paginate(svc$list_objects(Bucket = "my-bucket")) +``` + +### `paginate_lapply`: +Allows you to apply a function on each returning response. +```r +library(paws) + +svc <- s3(region = "us-west-2") + +results <- paginate_lapply(svc$list_objects(Bucket = "my-bucket"), \(resp) resp$Contents) +``` + +### `paginate_sapply`: +Allows you to apply a function on each returning response, however the final result is simplified similar to `base::sapply`. +```r +library(paws) + +svc <- s3(region = "us-west-2") + +results <- paginate_sapply( + svc$list_objects(Bucket = "my-bucket"), + \(resp) resp$Contents, + simplify = T +) +``` + +## Customizing page Iterators + +You can modify the operation by + +* `MaxItems:` + Limits the maximum number of total returned items returned while paginating. +* `StartingToken:` + Can be used to modify the starting marker or token of a paginator. This argument if useful for resuming pagination from a previous token or starting pagination at a known position. +* `PageSize:` + Controls the number of items returned per page of each result. + + +### `paginate` +```r +library(paws) + +svc <- s3(region = "us-west-2") + +results <- paginate(svc$list_objects(Bucket = "my-bucket"), MaxItems = 10) +``` + +#### `paginate_lapply` +```r +library(paws) + +svc <- s3(region = "us-west-2") + +results <- paginate_lapply(svc$list_objects(Bucket = "my-bucket"), \(page) page$Contents) +``` + +#### `paginate_sapply` +```r +library(paws) + +svc <- s3(region = "us-west-2") + +results <- paginate_lapply(svc$list_objects(Bucket = "my-bucket"), \(page) page$Contents) +``` + +## Piping: + +`paws` paginator support R native piping `|>`. However we currently don't support magrittr piping `%>%`. + +```r +library(paws) +library(magrittr) + +svc <- s3(region = "us-west-2") + +# Will Work +results <- svc$list_objects(Bucket = "my-bucket") |> paginate(MaxItems = 10) + +# Will error: +results <- svc$list_objects(Bucket = "my-bucket") %>% paginate(MaxItems = 10) +``` + + +## Filtering results: + +You can filter the paginator results by limiting the response for the paws operation. For example `list_objects` accepts `Prefix` parameter to filter page server-side before returning to `R`. + +```r +library(paws) + +svc <- s3(region = "us-west-2") + +kwargs <- list( + Bucket='my-bucket', + Prefix='foo/baz' +) +result <- do.call(svc$list_objects, kwargs) |> paginate_lapply(\(page) page$Contents) +``` diff --git a/make.paws/R/cran_category.R b/make.paws/R/cran_category.R index 045ac3264..a1d1a9e43 100644 --- a/make.paws/R/cran_category.R +++ b/make.paws/R/cran_category.R @@ -1,6 +1,8 @@ #' @include package.R service.R NULL +.paws.common.import.version <- "paws.common (>= 0.5.9)" + # Make all category-level packages. make_categories <- function(sdk_dir, out_dir, categories, service_names) { for (category in categories) { @@ -14,7 +16,7 @@ make_category <- function(category, service_names, sdk_dir, out_dir) { services <- category$services title <- category$title description <- category$description - imports <- "paws.common (>= 0.5.4)" + imports <- .paws.common.import.version version <- get_version(sdk_dir) if (is.null(name) || is.null(title) || is.null(description)) { @@ -34,6 +36,7 @@ make_category <- function(category, service_names, sdk_dir, out_dir) { for (service in services) { copy_files(service_names[[service]], from = sdk_dir, to = package_dir) } + copy_files("reexports", from = sdk_dir, to = package_dir) write_documentation(package_dir) } diff --git a/make.paws/R/cran_collection.R b/make.paws/R/cran_collection.R index c70858823..f267231ac 100644 --- a/make.paws/R/cran_collection.R +++ b/make.paws/R/cran_collection.R @@ -62,6 +62,7 @@ write_source_collection <- function(sdk_dir, } } write_list(clients, file.path(out_dir, "R", "paws.R")) + write_list(make_reexports(), file.path(out_dir, "R", "reexports_paws.common.R")) } # Add the category packages to the DESCRIPTION file's Imports. @@ -70,6 +71,7 @@ write_source_collection <- function(sdk_dir, # generate the package. write_imports_collection <- function(path, version, imports) { packages <- sprintf("%s (>= %s)", imports, version) + packages <- c(packages, .paws.common.import.version) desc::desc_set( Imports = paste0(packages, collapse = ","), file = file.path(path, "DESCRIPTION"), diff --git a/make.paws/R/operations.R b/make.paws/R/operations.R index 781fb7461..640fbc980 100644 --- a/make.paws/R/operations.R +++ b/make.paws/R/operations.R @@ -1,4 +1,5 @@ #' @include templates.R +#' @include utils.R NULL operation_file_template <- template( @@ -36,7 +37,7 @@ operation_template <- template( name = ${operation_name}, http_method = ${http_method}, http_path = ${http_path}, - paginator = list() + paginator = ${paginator} ) input <- .${service}$${operation_input} output <- .${service}$${operation_output} @@ -62,10 +63,32 @@ make_operation <- function(operation, api, doc_maker) { operation_input = get_operation_input(operation, api), operation_output = get_operation_output(operation), http_method = quoted(operation$http$method), - http_path = quoted(operation$http$requestUri) + http_path = quoted(operation$http$requestUri), + paginator = set_paginator(operation$paginators) ) } + +set_paginator <- function(paginator) { + if (!is.null(paginator)) { + output_token <- paginator$output_token + if (!is.null(output_token)) { + for (i in seq_along(output_token)) { + # output_token[[i]] <- strsplit(output_token[[i]], " ")[[1]][[1]] + output_token[i] <- list(trimws(strsplit(output_token[[i]], split = "||", fixed = T)[[1]])) + } + paginator$output_token <- unlist(output_token, use.names = FALSE) + paginator$input_token <- rep_len( + paginator$input_token, + length(paginator$output_token) + ) + } + paste(trimws(deparse(paginator)), collapse = " ") + } else { + "list()" + } +} + # Override operation name from extdata/operation_name_override.yml operation_name_override <- function(operation_name) { path <- system_file( diff --git a/make.paws/R/process_api.R b/make.paws/R/process_api.R index 4f54585a3..08d744b7a 100644 --- a/make.paws/R/process_api.R +++ b/make.paws/R/process_api.R @@ -27,6 +27,7 @@ make_code_files <- function(api) { result$interfaces <- make_interfaces_files(api) result$service <- make_service_files(api) result$custom <- make_custom_operations_files(api) + result$reexports <- make_reexports() return(result) } @@ -78,11 +79,23 @@ make_custom_operations_files <- function(api) { return(result) } +make_reexports <- function() { + result <- list() + from <- system_file("templates/reexports_paws.common.R", package = methods::getPackageName()) + filename <- "reexports_paws.common.R" + if (from != "" && file.exists(from)) { + contents <- readLines(from) + result[[file.path(CODE_DIR, filename)]] <- paste(contents, collapse = "\n") + } + return(result) +} + make_docs_files <- function(api) { result <- list() result$operations <- make_operations_files(api, doc_maker = make_docs_long) result$service <- make_service_files(api) result$custom <- make_custom_operations_files(api) + result$reexports <- make_reexports() return(result) } diff --git a/make.paws/R/read_api.R b/make.paws/R/read_api.R index ed5d86f08..4e5de4a51 100644 --- a/make.paws/R/read_api.R +++ b/make.paws/R/read_api.R @@ -14,6 +14,10 @@ read_api <- function(api_name, path) { examples <- jsonlite::read_json(files$examples) api <- merge_examples(api, examples$examples) } + if (!is.null(files$paginators)) { + paginators <- jsonlite::read_json(files$paginators) + api <- merge_paginators(api, paginators$pagination) + } region_config <- jsonlite::read_json(region_config_path) api <- merge_region_config(api, region_config) api <- fix_region_config(api) @@ -48,6 +52,16 @@ merge_examples <- function(api, examples) { return(api) } +# Returns an API object with paginators merged into the corresponding operations. +merge_paginators <- function(api, paginators) { + for (name in names(paginators)) { + operation <- api$operations[[name]] + operation[["paginators"]] <- paginators[[name]] + api$operations[[name]] <- operation + } + return(api) +} + # Returns an API object with region config info attached. Region config info # lists endpoints for each service and region, if different from the default. merge_region_config <- function(api, region_config) { diff --git a/make.paws/R/utils.R b/make.paws/R/utils.R index 864604098..417238e6d 100644 --- a/make.paws/R/utils.R +++ b/make.paws/R/utils.R @@ -13,7 +13,7 @@ parse_operations <- function(text) { ids <- rep(NA, length(text)) id <- 1 for (i in seq_along(text)) { - if (i > 1 && startsWith(text[i], "#'") && !startsWith(text[i-1], "#'")) { + if (i > 1 && startsWith(text[i], "#'") && !startsWith(text[i - 1], "#'")) { id <- id + 1 } ids[i] <- id @@ -35,9 +35,11 @@ parse_operation <- function(text) { comment_lines <- startsWith(text, "#'") comment <- text[comment_lines] code <- text[!comment_lines] - if (length(code) == 0 || all(code == "") || code[1] == "NULL") return(NULL) + if (length(code) == 0 || all(code == "") || code[1] == "NULL") { + return(NULL) + } func <- strsplit(code[1], " ")[[1]][1] - name <- substring(func, regexpr("_", func)+1) + name <- substring(func, regexpr("_", func) + 1) operation <- list( name = name, http = list(), @@ -65,14 +67,18 @@ system_file <- function(..., package = "base") { pkg_path <- find.package(package) subfolder <- list(...) if (length(subfolder) > 0) { - if (subfolder[[1]] == "src") + if (subfolder[[1]] == "src") { subfolder[[1]] <- "R" - else + } else { subfolder <- c("inst", subfolder) + } } path <- do.call(file.path, c(pkg_path, subfolder)) - if (file.exists(path)) return(path) - else return("") + if (file.exists(path)) { + return(path) + } else { + return("") + } } } @@ -125,3 +131,6 @@ get_url <- function(url, tries = 3) { return(NULL) }) } + +# helper function to make it easy to replace NULLs with default value +`%||%` <- function(x, y) if (is.null(x)) y else x diff --git a/make.paws/inst/templates/reexports_paws.common.R b/make.paws/inst/templates/reexports_paws.common.R new file mode 100644 index 000000000..99b43edae --- /dev/null +++ b/make.paws/inst/templates/reexports_paws.common.R @@ -0,0 +1,23 @@ +#' @importFrom paws.common paginate +#' @export +paws.common::paginate + +#' @importFrom paws.common paginate_lapply +#' @export +paws.common::paginate_lapply + +#' @importFrom paws.common paginate_sapply +#' @export +paws.common::paginate_sapply + +#' @importFrom paws.common config +#' @export +paws.common::config + +#' @importFrom paws.common credentials +#' @export +paws.common::credentials + +#' @importFrom paws.common creds +#' @export +paws.common::creds diff --git a/paws.common/DESCRIPTION b/paws.common/DESCRIPTION index 6f24aa200..20adb95f5 100644 --- a/paws.common/DESCRIPTION +++ b/paws.common/DESCRIPTION @@ -38,6 +38,7 @@ Roxygen: list(markdown = TRUE, roclets = c("rd", "namespace", "collate")) RoxygenNote: 7.2.3 Collate: 'RcppExports.R' + 'util.R' 'struct.R' 'handlers.R' 'logging.R' @@ -55,7 +56,6 @@ Collate: 'service.R' 'custom_dynamodb.R' 'custom_rds.R' - 'util.R' 'xmlutil.R' 'stream.R' 'custom_s3.R' @@ -70,6 +70,7 @@ Collate: 'idempotency.R' 'jsonutil.R' 'onLoad.R' + 'paginate.R' 'populate.R' 'populateutil.R' 'tags.R' diff --git a/paws.common/NAMESPACE b/paws.common/NAMESPACE index e965bd4f1..11fd598c1 100644 --- a/paws.common/NAMESPACE +++ b/paws.common/NAMESPACE @@ -24,6 +24,9 @@ export(new_handlers) export(new_operation) export(new_request) export(new_service) +export(paginate) +export(paginate_lapply) +export(paginate_sapply) export(paws_config_log) export(populate) export(send_request) diff --git a/paws.common/NEWS.md b/paws.common/NEWS.md index 14b5679fc..442caca95 100644 --- a/paws.common/NEWS.md +++ b/paws.common/NEWS.md @@ -1,6 +1,7 @@ # paws.common 0.5.9 * add expiration parameter to creds * add signature_version to config +* add the ability to paginate paws methods (#30) # paws.common 0.5.8 * fix mismatch apparent method as.list.struct (#634) diff --git a/paws.common/R/credential_providers.R b/paws.common/R/credential_providers.R index 7e9adde4f..ca03b0b29 100644 --- a/paws.common/R/credential_providers.R +++ b/paws.common/R/credential_providers.R @@ -4,6 +4,7 @@ #' @include dateutil.R #' @include iniutil.R #' @include logging.R +#' @include util.R NULL Creds <- struct( @@ -236,17 +237,17 @@ sso_credential_process <- function(sso_session, ) sso_cache <- file.path(root, ".aws", "sso", "cache", json_file) if (!file.exists(sso_cache)) { - stop(sprintf( + stopf( "Error loading SSO Token: Token for %s does not exist", input_str - ), call. = F) + ) } cache_creds <- jsonlite::fromJSON(sso_cache) if (!("accessToken" %in% names(cache_creds)) || !("expiresAt" %in% names(cache_creds))) { - stop(sprintf( + stopf( "Error loading SSO Token: Token for %s is invalid.", sso_start_url - ), call. = F) + ) } svc <- sso( config = list( @@ -494,11 +495,10 @@ iam_credentials_provider <- function() { no_credentials <- function() { message <- ( - if (isTRUE(getOption('paws.log_level') <= 2L)) { + if (isTRUE(getOption("paws.log_level") <= 2L)) { 'No compatible credentials provided. Use `options("paws.log_level" = 3L)` for more information.' } else { "No compatible credentials provided." - } - ) + }) stop(message, call. = FALSE) } diff --git a/paws.common/R/custom_s3.R b/paws.common/R/custom_s3.R index 3f372239b..3e8e28da2 100644 --- a/paws.common/R/custom_s3.R +++ b/paws.common/R/custom_s3.R @@ -7,15 +7,19 @@ NULL convert_file_to_raw <- function(request) { operation_name <- request$operation$name - if (operation_name != "PutObject") return(request) + if (operation_name != "PutObject") { + return(request) + } request_params <- request$params content_body <- request_params["Body"][[1]] - if (!is.character(content_body)) return(request) + if (!is.character(content_body)) { + return(request) + } file_name <- content_body[[1]] if (!file.exists(file_name)) { - stop(sprintf("Unable to find file: %s", file_name)) + stopf("Unable to find file: %s", file_name) } file_connection <- file(file_name, "rb") raw_body <- readBin(file_connection, "raw", n = file.size(file_name)) @@ -32,7 +36,9 @@ bucket_name_from_req_params <- function(request) { request_params <- request$params bucket <- request_params["Bucket"] - if (is.null(bucket)) return(NULL) + if (is.null(bucket)) { + return(NULL) + } bucket_name <- bucket[[1]] @@ -40,7 +46,9 @@ bucket_name_from_req_params <- function(request) { } host_compatible_bucket_name <- function(bucket) { - if (grepl(".", bucket, fixed = TRUE)) return(FALSE) + if (grepl(".", bucket, fixed = TRUE)) { + return(FALSE) + } domain <- "^[a-z0-9][a-z0-9\\.\\-]{1,61}[a-z0-9]$" ip_address <- "^(\\d+\\.){3}\\d+$" return(grepl(domain, bucket) && !grepl(ip_address, bucket)) @@ -51,7 +59,7 @@ move_bucket_to_host <- function(url, bucket) { url$path <- gsub("/\\{Bucket\\}", "", url$path) if (url$path == "") { - url$path = "/" + url$path <- "/" } return(url) @@ -89,7 +97,9 @@ remove_bucket_from_url <- function(url) { update_endpoint_for_s3_config <- function(request) { bucket_name <- bucket_name_from_req_params(request) - if (is.null(bucket_name)) return(request) + if (is.null(bucket_name)) { + return(request) + } if (is_access_point(bucket_name)) { request$http_request$url$host <- get_access_point_endpoint(bucket_name) @@ -97,9 +107,13 @@ update_endpoint_for_s3_config <- function(request) { return(request) } - if (!host_compatible_bucket_name(bucket_name)) return(request) + if (!host_compatible_bucket_name(bucket_name)) { + return(request) + } - if (request$operation$name %in% c("GetBucketLocation")) return(request) + if (request$operation$name %in% c("GetBucketLocation")) { + return(request) + } use_virtual_host_style <- TRUE if (request$config$s3_force_path_style) use_virtual_host_style <- FALSE @@ -116,10 +130,11 @@ update_endpoint_for_s3_config <- function(request) { ################################################################################ populate_location_constraint <- function(request) { - operation_name <- request$operation$name - if (operation_name != "CreateBucket") return(request) + if (operation_name != "CreateBucket") { + return(request) + } request_params <- request$params location <- request_params$CreateBucketConfiguration$LocationConstraint @@ -135,12 +150,16 @@ populate_location_constraint <- function(request) { content_md5 <- function(request) { operation_name <- request$operation$name - if (!(operation_name %in% c("PutBucketCors", "PutBucketLifecycle", - "PutBucketPolicy", "PutBucketTagging", - "DeleteObjects", - "PutBucketLifecycleConfiguration", - "PutBucketReplication", "PutObject", - "UploadPart"))) {return(request)} + if (!(operation_name %in% c( + "PutBucketCors", "PutBucketLifecycle", + "PutBucketPolicy", "PutBucketTagging", + "DeleteObjects", + "PutBucketLifecycleConfiguration", + "PutBucketReplication", "PutObject", + "UploadPart" + ))) { + return(request) + } # Create Content-MD5 header if missing. # https://github.com/aws/aws-sdk-go/blob/e2d6cb448883e4f4fcc5246650f89bde349041ec/private/checksum/content_md5.go#L18 if (is.null(request$http_request$header[["Content-MD5"]])) { @@ -156,7 +175,9 @@ content_md5 <- function(request) { ################################################################################ s3_unmarshal_select_object_content <- function(request) { - if (request$operation$name != "SelectObjectContent") return(request) + if (request$operation$name != "SelectObjectContent") { + return(request) + } payload <- stream_decode(request$http_response$body) request$data <- populate(list(Payload = payload), request$data) request$http_response$body <- raw() @@ -166,12 +187,17 @@ s3_unmarshal_select_object_content <- function(request) { ################################################################################ s3_unmarshal_get_bucket_location <- function(request) { - if (request$operation$name != "GetBucketLocation") return(request) + if (request$operation$name != "GetBucketLocation") { + return(request) + } response <- decode_xml(request$http_response$body) data <- request$data location <- response$LocationConstraint - if (length(location) == 0) location <- "us-east-1" - else location <- location[[1]] + if (length(location) == 0) { + location <- "us-east-1" + } else { + location <- location[[1]] + } if (location == "EU") location <- "eu-west-1" data$LocationConstraint <- location request$data <- data @@ -181,7 +207,6 @@ s3_unmarshal_get_bucket_location <- function(request) { ################################################################################ s3_unmarshal_error <- function(request) { - data <- tryCatch( decode_xml(request$http_response$body), error = function(e) NULL @@ -221,9 +246,11 @@ s3_unmarshal_error <- function(request) { message <- error_response$Message if (is.null(message) && is.null(code)) { - request$error <- Error("SerializationError", - "failed to decode query XML error response", - request$http_response$status_code) + request$error <- Error( + "SerializationError", + "failed to decode query XML error response", + request$http_response$status_code + ) return(request) } @@ -258,13 +285,13 @@ s3_endpoints <- list( # contains the endpoint the request should be sent to. This handler # will add the redirect information to the signing context and then # redirect the request. -s3_redirect_from_error <- function(request){ +s3_redirect_from_error <- function(request) { if (is.null(request$http_response)) { - return(request) + return(request) } if (isTRUE(request$context$s3_redirect)) { log_debug( - 'S3 request was previously redirected, not redirecting.' + "S3 request was previously redirected, not redirecting." ) return(request) } @@ -273,7 +300,7 @@ s3_redirect_from_error <- function(request){ # Exit s3_redirect_from_error function if initial request is successful # https://docs.aws.amazon.com/waf/latest/developerguide/customizing-the-response-status-codes.html http_success_code <- c(200, 201, 202, 204, 206) - if(error_code %in% http_success_code){ + if (error_code %in% http_success_code) { return(request) } error <- decode_xml(request$http_response$body)$Error @@ -283,18 +310,22 @@ s3_redirect_from_error <- function(request){ bucket_name <- bucket_name_from_req_params(request) new_region <- s3_get_bucket_region(request$http_response, error) if (is.null(new_region)) { - log_debug(paste( - "S3 client configured for region %s but the bucket %s is not", - "in that region and the proper region could not be", - "automatically determined."), + log_debug( + paste( + "S3 client configured for region %s but the bucket %s is not", + "in that region and the proper region could not be", + "automatically determined." + ), request$client_info$signing_region, bucket_name ) return(request) } - log_debug(paste( - "S3 client configured for region %s but the bucket %s is in region", - "%s; Please configure the proper region to avoid multiple", - "unnecessary redirects and signing attempts."), + log_debug( + paste( + "S3 client configured for region %s but the bucket %s is in region", + "%s; Please configure the proper region to avoid multiple", + "unnecessary redirects and signing attempts." + ), request$client_info$signing_region, bucket_name, new_region ) # Update client_info for redirect @@ -322,24 +353,24 @@ s3_redirect_from_error <- function(request){ return(request) } -can_be_redirected <- function(request, error_code, error){ +can_be_redirected <- function(request, error_code, error) { # We have to account for 400 responses because # if we sign a Head* request with the wrong region, # we'll get a 400 Bad Request but we won't get a # body saying it's an "AuthorizationHeaderMalformed". is_special_head_object <- ( - error_code %in% c('301', '400') & request$operation$name == 'HeadObject' + error_code %in% c("301", "400") & request$operation$name == "HeadObject" ) is_special_head_bucket <- ( - error_code %in% c('301', '400') - & request$operation$name == 'HeadBucket' - & 'x-amz-bucket-region' %in% names(request$http_response$header) + error_code %in% c("301", "400") & + request$operation$name == "HeadBucket" & + "x-amz-bucket-region" %in% names(request$http_response$header) ) is_wrong_signing_region <- ( - error$Code == 'AuthorizationHeaderMalformed' & 'Region' %in% names(error) + error$Code == "AuthorizationHeaderMalformed" & "Region" %in% names(error) ) is_redirect_status <- request$http_response$status_code %in% c(301, 302, 307) - is_permanent_redirect <- error$Code == 'PermanentRedirect' + is_permanent_redirect <- error$Code == "PermanentRedirect" return(any( c( @@ -348,9 +379,8 @@ can_be_redirected <- function(request, error_code, error){ is_permanent_redirect, is_special_head_bucket, is_redirect_status - ) ) - ) + )) } # There are multiple potential sources for the new region to redirect to, @@ -359,11 +389,11 @@ can_be_redirected <- function(request, error_code, error){ # HEAD on the bucket if all else fails. # param response: HttpResponse # param error: Error -s3_get_bucket_region <- function(response, error){ +s3_get_bucket_region <- function(response, error) { # First try to source the region from the headers. response_headers <- response$header - if ('x-amz-bucket-region' %in% names(response_headers)){ - return(response_headers[['x-amz-bucket-region']]) + if ("x-amz-bucket-region" %in% names(response_headers)) { + return(response_headers[["x-amz-bucket-region"]]) } # Next, check the error body region <- error$Region @@ -375,7 +405,7 @@ s3_get_bucket_region <- function(response, error){ # discarded by this function. set_request_url <- function(original_endpoint, new_endpoint, - use_new_scheme = TRUE){ + use_new_scheme = TRUE) { new_endpoint_components <- httr::parse_url(new_endpoint) original_endpoint_components <- httr::parse_url(original_endpoint) scheme <- original_endpoint_components$scheme @@ -383,14 +413,14 @@ set_request_url <- function(original_endpoint, scheme <- new_endpoint_components$scheme } final_endpoint_components <- structure(list( - scheme = scheme, - hostname = new_endpoint_components$hostname %||% "", - path = original_endpoint_components$path %||% "", - query = original_endpoint_components$query %||% "", - fragment = "", - raw_path = "", - raw_query = ""), class = "url" - ) + scheme = scheme, + hostname = new_endpoint_components$hostname %||% "", + path = original_endpoint_components$path %||% "", + query = original_endpoint_components$query %||% "", + fragment = "", + raw_path = "", + raw_query = "" + ), class = "url") final_endpoint <- build_url(final_endpoint_components) return(final_endpoint) } @@ -398,19 +428,31 @@ set_request_url <- function(original_endpoint, ################################################################################ customizations$s3 <- function(handlers) { - handlers$build <- handlers_add_front(handlers$build, - update_endpoint_for_s3_config) - handlers$build <- handlers_add_front(handlers$build, - populate_location_constraint) - handlers$build <- handlers_add_front(handlers$build, - convert_file_to_raw) - handlers$build <- handlers_add_back(handlers$build, - content_md5) + handlers$build <- handlers_add_front( + handlers$build, + update_endpoint_for_s3_config + ) + handlers$build <- handlers_add_front( + handlers$build, + populate_location_constraint + ) + handlers$build <- handlers_add_front( + handlers$build, + convert_file_to_raw + ) + handlers$build <- handlers_add_back( + handlers$build, + content_md5 + ) handlers$send <- handlers_add_back(handlers$send, s3_redirect_from_error) - handlers$unmarshal <- handlers_add_front(handlers$unmarshal, - s3_unmarshal_select_object_content) - handlers$unmarshal <- handlers_add_back(handlers$unmarshal, - s3_unmarshal_get_bucket_location) + handlers$unmarshal <- handlers_add_front( + handlers$unmarshal, + s3_unmarshal_select_object_content + ) + handlers$unmarshal <- handlers_add_back( + handlers$unmarshal, + s3_unmarshal_get_bucket_location + ) handlers$unmarshal_error <- handlers_set(s3_unmarshal_error) handlers } diff --git a/paws.common/R/iniutil.R b/paws.common/R/iniutil.R index 902bf2ef0..a43748b2e 100644 --- a/paws.common/R/iniutil.R +++ b/paws.common/R/iniutil.R @@ -1,3 +1,5 @@ +#' @include util.R + # Get the profile name from an ini file extract_ini_profile <- function(item) { profile <- gsub("\\[|\\]", "", item) @@ -17,7 +19,7 @@ extract_ini_parameter <- function(item) { # Read in values from an ini file read_ini <- function(file_name) { if (!file.exists(file_name)) { - stop(sprintf("Unable to find file: %s", file_name)) + stopf("Unable to find file: %s", file_name) } content <- scan(file_name, what = "", sep = "\n", quiet = T) profiles <- list() diff --git a/paws.common/R/net.R b/paws.common/R/net.R index 802acc2dc..db32708f9 100644 --- a/paws.common/R/net.R +++ b/paws.common/R/net.R @@ -1,5 +1,6 @@ #' @include struct.R #' @include url.R +#' @include util.R NULL # Construct an HTTP request object. @@ -63,7 +64,7 @@ new_http_request <- function(method, url, body = NULL, close = FALSE, connect_ti method <- "GET" } if (!valid_method(method)) { - stop(sprintf("invalid method: %s", method)) + stopf("invalid method: %s", method) } u <- parse_url(url) req <- HttpRequest( @@ -120,14 +121,14 @@ issue <- function(http_request) { # utilize httr to write to disk dest <- NULL - if(!is.null(http_request$dest)) { + if (!is.null(http_request$dest)) { dest <- httr::write_disk(http_request$dest) } r <- with_paws_verbose( httr::VERB( method, url = url, - config = c(httr::add_headers(.headers=headers), dest), + config = c(httr::add_headers(.headers = headers), dest), body = body, timeout ) @@ -139,7 +140,7 @@ issue <- function(http_request) { content_length = as.integer(httr::headers(r)$`content-length`), # Prevent reading in data when output is set body = ( - if(is.null(http_request$dest)) httr::content(r, as = "raw") else raw() + if (is.null(http_request$dest)) httr::content(r, as = "raw") else raw() ) ) @@ -163,7 +164,7 @@ is_compressed <- function(http_response) { } if (content_encoding == "gzip") { - bits_to_int <- function(x) sum(as.integer(x) * 2^(1:length(x)-1)) + bits_to_int <- function(x) sum(as.integer(x) * 2^(1:length(x) - 1)) cmf <- http_response$body[1] flg <- http_response$body[2] compression_method <- bits_to_int(rawToBits(cmf)[1:4]) diff --git a/paws.common/R/paginate.R b/paws.common/R/paginate.R new file mode 100644 index 000000000..80e4106c2 --- /dev/null +++ b/paws.common/R/paginate.R @@ -0,0 +1,327 @@ +#' @include logging.R +#' @include util.R + +.do_call <- as.name("do.call") + +#' @title Paginate over an operation. +#' @description +#' Some AWS operations return results that are incomplete and require subsequent +#' requests in order to attain the entire result set. The process of sending subsequent +#' requests to continue where a previous request left off is called pagination. +#' For example, the list_objects operation of Amazon S3 returns up to 1000 objects +#' at a time, and you must send subsequent requests with the appropriate Marker +#' in order to retrieve the next page of results. +#' +#' @param Operation The operation for example an s3 operation: \code{svc$list_buckets()} +#' @param MaxRetries Max number of retries call AWS API. +#' @param PageSize The size of each page. +#' @param MaxItems Limits the maximum number of total returned items returned while paginating. +#' @param StartingToken Can be used to modify the starting marker or token of a paginator. +#' This argument if useful for resuming pagination from a previous token or starting pagination at a known position. +#' @param FUN the function to be applied to each response element of \code{operation}. +#' @param simplify See \link[base:sapply]{base::sapply()}. +#' @param ... optional arguments to \code{FUN}. +#' @return list of responses from the operation. +#' @examples +#' \dontrun{ +#' # The following example retrieves object list. The request specifies max +#' # keys to limit response to include only 2 object keys. +#' paginate( +#' svc$list_objects_v2( +#' Bucket = "DOC-EXAMPLE-BUCKET" +#' ), +#' MaxItems = 50 +#' ) +#' } +#' @name paginate +#' @export +paginate <- function(Operation, + MaxRetries = 5, + PageSize = NULL, + MaxItems = NULL, + StartingToken = NULL) { + fn <- substitute(Operation) + # rebuild fn for do.call + if (identical(fn[[1]], .do_call)) { + kwargs <- eval(fn[[3]]) + fn <- fn[2] + for (key in names(kwargs)) { + fn[key] <- kwargs[[key]] + } + } + # update fn with pagesize and starting token + fn_update <- paginate_update_fn(fn, PageSize, StartingToken) + fn <- fn_update$fn + paginator <- fn_update$paginator + primary_result_key <- paginator$result_key[[1]] + no_items <- 0 + result <- list() + while (!identical(fn[[paginator$input_token[[1]]]], character(0))) { + resp <- retry_api_call(eval(fn), retries = MaxRetries) + new_tokens <- get_tokens(resp, paginator$output_token) + for (i in seq_along(new_tokens)) { + fn[[paginator$input_token[[i]]]] <- new_tokens[[i]] + } + result[[length(result) + 1]] <- resp + + # exit if no more results + if (!is.null(paginator$more_results)) { + if (isFALSE(resp[[paginator$more_results]])) { + break + } + } + if (!is.null(MaxItems)) { + no_items <- no_items + length(resp[[primary_result_key]]) + if (no_items >= MaxItems) { + break + } + } + } + return(result) +} + +#' @rdname paginate +#' @export +paginate_lapply <- function(Operation, + FUN, + ..., + MaxRetries = 5, + PageSize = NULL, + MaxItems = NULL, + StartingToken = NULL) { + FUN <- match.fun(FUN) + fn <- substitute(Operation) + + # rebuild fn for do.call + if (identical(fn[[1]], .do_call)) { + kwargs <- eval(fn[[3]]) + fn <- fn[2] + for (key in names(kwargs)) { + fn[key] <- kwargs[[key]] + } + } + # update fn with pagesize and starting token + fn_update <- paginate_update_fn(fn, PageSize, StartingToken) + result <- paginate_xapply( + fn = fn_update$fn, + paginator = fn_update$paginator, + FUN = FUN, + ..., + MaxRetries = MaxRetries, + MaxItems = MaxItems + ) + return(result) +} + +#' @rdname paginate +#' @export +paginate_sapply <- function(Operation, + FUN, + ..., + simplify = TRUE, + MaxRetries = 5, + PageSize = NULL, + MaxItems = NULL, + StartingToken = NULL) { + FUN <- match.fun(FUN) + fn <- substitute(Operation) + + # rebuild fn for do.call + if (identical(fn[[1]], .do_call)) { + kwargs <- eval(fn[[3]]) + fn <- fn[2] + for (key in names(kwargs)) { + fn[key] <- kwargs[[key]] + } + } + # update fn with pagesize and starting token + fn_update <- paginate_update_fn(fn, PageSize, StartingToken) + result <- paginate_xapply( + fn = fn_update$fn, + paginator = fn_update$paginator, + FUN = FUN, + ..., + MaxRetries = MaxRetries, + MaxItems = MaxItems + ) + + if (!isFALSE(simplify)) { + simplify2array(result, higher = (simplify == "array")) + } else { + result + } +} + +paginate_update_fn <- function( + fn, + PageSize = NULL, + StartingToken = NULL) { + fn_call <- eval(fn[[1]]) + pkg_name <- environmentName(environment(fn_call)) + + # Ensure method can be found. + if (!grepl("^paws", pkg_name, perl = T)) { + stopf( + "Unknown method: `%s`. Please check service methods and try again.", + as.character(fn)[1] + ) + } + + fn_body <- body(fn_call) + paginator <- fn_body[[2]][[3]]$paginator + + # Check if method can paginate + if (!all(c("input_token", "output_token") %in% names(paginator))) { + stopf( + "Method: `%s` is unable to paginate.", + as.character(fn)[1] + ) + } + + # only update input_token if single token + if (length(paginator$input_token) == 1) { + if (is.null(fn[[paginator$input_token]])) { + fn[paginator$input_token] <- StartingToken + } + } + if (!is.null(paginator$limit_key)) { + if (is.null(fn[[paginator$limit_key]])) { + fn[paginator$limit_key] <- PageSize + } + } + + return(list( + fn = fn, + paginator = eval(paginator) + )) +} + +paginate_xapply <- function( + fn, + paginator, + FUN, + ..., + MaxRetries = 5, + MaxItems = NULL) { + primary_result_key <- paginator$result_key[[1]] + no_items <- 0 + result <- list() + while (!identical(fn[[paginator$input_token[[1]]]], character(0))) { + resp <- retry_api_call(eval(fn), retries = MaxRetries) + new_tokens <- get_tokens(resp, paginator$output_token) + for (i in seq_along(new_tokens)) { + fn[[paginator$input_token[[i]]]] <- new_tokens[[i]] + } + result[[length(result) + 1]] <- FUN(resp, ...) + + # exit if no more results + if (!is.null(paginator$more_results)) { + if (isFALSE(resp[[paginator$more_results]])) { + break + } + } + if (!is.null(MaxItems)) { + no_items <- no_items + length(resp[[primary_result_key]]) + if (no_items >= MaxItems) { + break + } + } + } + return(result) +} + +# Get all output tokens +get_tokens <- function(resp, output_tokens) { + tokens <- list() + for (token in output_tokens) { + if (grepl("\\[-1\\]", token)) { + tokens[[token]] <- get_token_len(resp, token) + } else { + tokens[[token]] <- get_token_path(resp, token) + } + } + return(tokens) +} + +# Get Token along a response path: i.e. +# Path.To.Token +get_token_path <- function(resp, token) { + token_prts <- strsplit(token, "\\.")[[1]] + build_key <- character(length(token_prts)) + for (i in seq_along(token_prts)) { + build_key[i] <- token_prts[[i]] + } + location <- paste0('resp[["', paste(build_key, collapse = '"]][["'), '"]]') + return(eval(parse(text = location), envir = environment())) +} + +# Get Token from the last element in a response path: i.e. +# Path.To[-1].Token +get_token_len <- function(resp, token) { + last_element <- function(x) { + x[[length(x)]] + } + build_part <- function(x) { + paste0('last_element(resp[["', paste0(x, collapse = '"]][["'), '"]])') + } + token_prts <- strsplit(token, "\\.")[[1]] + + build_key <- character(0) + for (i in seq_along(token_prts)) { + if (grepl("\\[-1\\]", token_prts[[i]])) { + build_key[length(build_key) + 1] <- gsub("\\[-1\\]", "", token_prts[[i]]) + build_key <- build_part(build_key) + } else { + build_key[length(build_key) + 1] <- token_prts[[i]] + } + } + location <- paste0(paste(build_key, collapse = '[["'), '"]]') + tryCatch( + { + return(eval(parse(text = location), envir = environment())) + }, + error = function(e) { + # Return default character(0) for empty lists + if (grepl( + "attempt to select less than one element in integerOneIndex", + e$message, + perl = T + )) { + character(0) + } else { + stop(e) + } + } + ) +} + +# See https://docs.aws.amazon.com/sdkref/latest/guide/feature-retry-behavior.html +retry_api_call <- function(expr, retries) { + for (i in seq_len(retries + 1)) { + tryCatch( + { + return(eval.parent(substitute(expr))) + }, + error = function(err) { + msg <- err$message + + # Only Retry rate exceeded errors. + if (grepl("rate exceeded", msg, ignore.case = T)) { + exp_back_off(err, i, retries) + } else { + stop(err) + } + } + ) + } +} + +# Retry with exponential backoff with jitter +exp_back_off <- function(error, i, retries) { + if (i == (retries + 1)) { + stop(error) + } + time <- min(runif(1) * 2^i, 20) + log_error("Request failed. Retrying in %s seconds...", time) + Sys.sleep(time) +} diff --git a/paws.common/R/populate.R b/paws.common/R/populate.R index b7a008536..773e7bb0a 100644 --- a/paws.common/R/populate.R +++ b/paws.common/R/populate.R @@ -1,9 +1,13 @@ +#' @include util.R + # Sometimes the locationName is different from the interface name -check_location_name <- function(name, interface){ +check_location_name <- function(name, interface) { location_names <- sapply(interface, function(x) tag_get(x, "locationName")) in_location_names <- name %in% location_names - if (!in_location_names) return(in_location_names) + if (!in_location_names) { + return(in_location_names) + } location_index <- which(name == location_names) @@ -16,15 +20,20 @@ populate_structure <- function(input, interface) { # If interface is empty (input shape is incomplete), return the input data. # Only needed because input shapes have fixed depth, and some services, # e.g. DynamoDB, can accept data of arbitrary depth. - if (length(interface) == 0) return(input) + if (length(interface) == 0) { + return(input) + } for (name in names(input)) { if (!(name) %in% names(interface)) { check_location <- check_location_name(name, interface) - if (!check_location) - stop(sprintf("invalid name: %s", name)) + if (!check_location) { + stopf("invalid name: %s", name) + } - interface[[check_location]] <- populate(input[[name]], - interface[[check_location]]) + interface[[check_location]] <- populate( + input[[name]], + interface[[check_location]] + ) } else { interface[[name]] <- populate(input[[name]], interface[[name]]) } @@ -36,7 +45,9 @@ populate_list <- function(input, interface) { # If interface is empty (input shape is incomplete), return the input data. # Only needed because input shapes have fixed depth, and some services, # e.g. DynamoDB, can accept data of arbitrary depth. - if (length(interface) == 0) return(input) + if (length(interface) == 0) { + return(input) + } attrs <- attributes(interface) interface <- lapply(input, function(x) populate(x, interface[[1]])) attributes(interface) <- attrs @@ -47,7 +58,9 @@ populate_map <- function(input, interface) { # If interface is empty (input shape is incomplete), return the input data. # Only needed because input shapes have fixed depth, and some services, # e.g. DynamoDB, can accept data of arbitrary depth. - if (length(interface) == 0) return(input) + if (length(interface) == 0) { + return(input) + } result <- list() for (name in names(input)) { result[[name]] <- populate(input[[name]], interface[[1]]) @@ -82,8 +95,7 @@ populate_scalar <- function(input, interface) { #' @export populate <- function(input, interface) { t <- tag_get(interface, "type") - populate_fn <- switch( - t, + populate_fn <- switch(t, structure = populate_structure, list = populate_list, map = populate_map, diff --git a/paws.common/R/struct.R b/paws.common/R/struct.R index 68bd002bb..6d735c354 100644 --- a/paws.common/R/struct.R +++ b/paws.common/R/struct.R @@ -1,3 +1,5 @@ +#' @include util.R + # Create a constructor function for a named list data structure, where the # values of its elements be changed but none can be added or deleted. # `MyList <- struct(a = 1, b = 2)` will create a function to construct a @@ -20,7 +22,7 @@ struct <- function(...) { #' @export `[.struct` <- function(x, key) { if (!(key %in% names(x))) { - stop(sprintf("invalid element: %s", key)) + stopf("invalid element: %s", key) } value <- x[[key]] return(value) @@ -35,7 +37,7 @@ struct <- function(...) { #' @export `[<-.struct` <- function(x, key, value) { if (!(key %in% names(x))) { - stop(sprintf("invalid element: %s", key)) + stopf("invalid element: %s", key) } cl <- oldClass(x) class(x) <- NULL diff --git a/paws.common/R/util.R b/paws.common/R/util.R index 718fd5299..8539d45e6 100644 --- a/paws.common/R/util.R +++ b/paws.common/R/util.R @@ -17,7 +17,9 @@ #' #' @export is_empty <- function(x) { - if (is.null(x) || length(x) == 0) return(TRUE) + if (is.null(x) || length(x) == 0) { + return(TRUE) + } UseMethod("is_empty") } @@ -60,7 +62,9 @@ is_empty.default <- function(x) { #' #' @export is_empty_xml <- function(x) { - if (is.null(x) || is_empty_logical(x) || is_empty_character(x)) return(TRUE) + if (is.null(x) || is_empty_logical(x) || is_empty_character(x)) { + return(TRUE) + } UseMethod("is_empty_xml") } @@ -76,7 +80,9 @@ is_empty_xml.raw <- is_empty.raw is_empty_xml.list <- function(x) { # keep empty lists when parsed from parameters # issue: https://github.com/paws-r/paws/issues/537 - if(length(x) == 0) return (FALSE) + if (length(x) == 0) { + return(FALSE) + } return(all(sapply(x, is_empty_xml))) } @@ -109,7 +115,7 @@ call_with_args <- function(f, data) { } # helper function to make it easy to replace NULLs with default value -`%||%` <- function(x,y) if(is.null(x)) y else x +`%||%` <- function(x, y) if (is.null(x)) y else x sort_list <- function(x) { if (length(x) == 0) { @@ -125,7 +131,7 @@ str_match <- function(str, pattern) { # Get parameter names from http_path template: get_template_params <- function(str) { - out <- str_match(str, '\\{(.*?)}') + out <- str_match(str, "\\{(.*?)}") return(out[grep("\\{.*\\}", out, invert = T, perl = T)]) } @@ -137,7 +143,7 @@ sprintf_template <- function(template) { auth_temp <- temp_split[grepl("\\{.*\\}", temp_split)] # set template to sprintf format - m <- gregexpr('\\{(.*?)}', auth_temp, perl = T) + m <- gregexpr("\\{(.*?)}", auth_temp, perl = T) regmatches(auth_temp, m) <- "%s" return(auth_temp) } @@ -148,7 +154,7 @@ sprintf_template <- function(template) { # render params into http_path template # for example: # /{Bucket}/{Key+} -> /demo_bucket/path/to/file -render_template <- function(request){ +render_template <- function(request) { template <- request$operation$http_path template_params <- get_template_params(template) encoded_params <- vector("list", length(template_params)) @@ -156,7 +162,8 @@ render_template <- function(request){ for (p in template_params) { if (grepl("\\+", p, perl = TRUE)) { encoded_params[[p]] <- paws_url_encoder( - request$params[[gsub("\\+", "", p, perl = TRUE)]], safe = "/~" + request$params[[gsub("\\+", "", p, perl = TRUE)]], + safe = "/~" ) } else { encoded_params[[p]] <- paws_url_encoder( @@ -172,8 +179,8 @@ LABEL_RE <- "[a-z0-9][a-z0-9\\-]*[a-z0-9]" # Developed from: # https://github.com/boto/botocore/blob/cc3f1c22f55ba50ca792eb73e7a6f721abdcc5ee/botocore/utils.py#L1275-L1295 -check_dns_name <- function(bucket_name){ - if (grepl("\\.", bucket_name, perl=TRUE)) { +check_dns_name <- function(bucket_name) { + if (grepl("\\.", bucket_name, perl = TRUE)) { return(FALSE) } n <- nchar(bucket_name) @@ -208,3 +215,7 @@ get_auth <- function(request) { } return(auth_path) } + +stopf <- function(fmt, ...) { + stop(sprintf(fmt, ...), call. = FALSE) +} diff --git a/paws.common/man/paginate.Rd b/paws.common/man/paginate.Rd new file mode 100644 index 000000000..51113743c --- /dev/null +++ b/paws.common/man/paginate.Rd @@ -0,0 +1,78 @@ +% Generated by roxygen2: do not edit by hand +% Please edit documentation in R/paginate.R +\name{paginate} +\alias{paginate} +\alias{paginate_lapply} +\alias{paginate_sapply} +\title{Paginate over an operation.} +\usage{ +paginate( + Operation, + MaxRetries = 5, + PageSize = NULL, + MaxItems = NULL, + StartingToken = NULL +) + +paginate_lapply( + Operation, + FUN, + ..., + MaxRetries = 5, + PageSize = NULL, + MaxItems = NULL, + StartingToken = NULL +) + +paginate_sapply( + Operation, + FUN, + ..., + simplify = TRUE, + MaxRetries = 5, + PageSize = NULL, + MaxItems = NULL, + StartingToken = NULL +) +} +\arguments{ +\item{Operation}{The operation for example an s3 operation: \code{svc$list_buckets()}} + +\item{MaxRetries}{Max number of retries call AWS API.} + +\item{PageSize}{The size of each page.} + +\item{MaxItems}{Limits the maximum number of total returned items returned while paginating.} + +\item{StartingToken}{Can be used to modify the starting marker or token of a paginator. +This argument if useful for resuming pagination from a previous token or starting pagination at a known position.} + +\item{FUN}{the function to be applied to each response element of \code{operation}.} + +\item{...}{optional arguments to \code{FUN}.} + +\item{simplify}{See \link[base:sapply]{base::sapply()}.} +} +\value{ +list of responses from the operation. +} +\description{ +Some AWS operations return results that are incomplete and require subsequent +requests in order to attain the entire result set. The process of sending subsequent +requests to continue where a previous request left off is called pagination. +For example, the list_objects operation of Amazon S3 returns up to 1000 objects +at a time, and you must send subsequent requests with the appropriate Marker +in order to retrieve the next page of results. +} +\examples{ +\dontrun{ +# The following example retrieves object list. The request specifies max +# keys to limit response to include only 2 object keys. +paginate( + svc$list_objects_v2( + Bucket = "DOC-EXAMPLE-BUCKET" + ), + MaxItems = 50 +) +} +} diff --git a/paws.common/tests/testthat/test_paginate.R b/paws.common/tests/testthat/test_paginate.R new file mode 100644 index 000000000..15194cae3 --- /dev/null +++ b/paws.common/tests/testthat/test_paginate.R @@ -0,0 +1,570 @@ +######################################################################## +# get_tokens +######################################################################## + +test_that("check token is correctly retrieved", { + output_tokens <- list( + "NextToken", + "Contents.Keys[-1].Id", + "Mark.NextToken" + ) + resp <- list( + NextToken = "token1", + Contents = list(Keys = list(list(Id = "wrong_token"), list(Id = "token2"))), + Mark = list(NextToken = "token3") + ) + expected <- setNames(list("token1", "token2", "token3"), output_tokens) + actual <- get_tokens(resp, output_tokens) + expect_equal(actual, expected) +}) + +test_that("check empty token is returned", { + output_tokens <- list( + "NextToken", + "Contents[-1].Id" + ) + resp <- list( + NextToken = character(0), + Contents = list() + ) + expected <- setNames(list(character(0), character(0)), output_tokens) + actual <- get_tokens(resp, output_tokens) + expect_equal(actual, expected) +}) + +######################################################################## +# retry_api_call +######################################################################## + +test_that("check if retry_api_call retries correctly", { + mock_exp_back_off <- mock2(side_effect = exp_back_off) + mockery::stub(retry_api_call, "exp_back_off", mock_exp_back_off) + + expect_error(retry_api_call(stop("rate exceeded"), 2)) + expect_equal(mock_call_no(mock_exp_back_off), 3) +}) + +test_that("check if retry_api_call doesn't retry", { + mock_exp_back_off <- mock2(side_effect = exp_back_off) + mockery::stub(retry_api_call, "exp_back_off", mock_exp_back_off) + + expect_error(retry_api_call(stop("error"), 2)) + expect_equal(mock_call_no(mock_exp_back_off), 0) +}) + +######################################################################## +# paginate_update_fn +######################################################################## + +test_that("check paginate_update_fn", { + dummy_internal <- function(paginator) { + paginator + } + dummy_op <- function(x, NextToken=NULL, MaxKey=NULL) { + op <- dummy_internal(paginator = list( + input_token = "NextToken", + output_token = "NextToken", + limit_key = "MaxKey", + result_key = "Contents" + ) + ) + list(NextToken=NextToken, MaxKey=MaxKey) + } + + mock_environmentName <- mock2("paws.storage") + mockery::stub(paginate_update_fn, "environmentName", mock_environmentName) + actual <- paginate_update_fn(substitute(dummy_op(x = "hi")), PageSize = 10, StartingToken = "token1") + expect_fn <- substitute(dummy_op(x = "hi", NextToken = "token1", MaxKey = 10)) + expect_paginator <- list( + input_token = "NextToken", + output_token = "NextToken", + limit_key = "MaxKey", + result_key = "Contents" + ) + + expect_equal(actual$fn, expect_fn) + expect_equal(actual$paginator, expect_paginator) +}) + +test_that("check paginate_update_fn non paws operation", { + dummy_internal <- function(paginator) { + paginator + } + dummy_op <- function(x, NextToken=NULL, MaxKey=NULL) { + op <- dummy_internal(paginator = list( + input_token = "NextToken", + output_token = "NextToken", + limit_key = "MaxKey", + result_key = "Contents" + ) + ) + list(NextToken=NextToken, MaxKey=MaxKey) + } + + expect_error( + paginate_update_fn( + substitute(dummy_op(x = "hi")), PageSize = 10, StartingToken = "token1" + ), + "Unknown method:.*. Please check service methods and try again." + ) +}) + +test_that("check paginate_update_fn unable to paginate", { + dummy_internal <- function(paginator) { + paginator + } + dummy_op <- function(x, NextToken=NULL, MaxKey=NULL) { + op <- dummy_internal(paginator = list()) + list(NextToken=NextToken, MaxKey=MaxKey) + } + + mock_environmentName <- mock2("paws.storage") + mockery::stub(paginate_update_fn, "environmentName", mock_environmentName) + expect_error( + paginate_update_fn( + substitute(dummy_op(x = "hi")), PageSize = 10, StartingToken = "token1" + ), + "Method:.*is unable to paginate" + ) +}) + + +######################################################################## +# paginate +######################################################################## + +test_that("check paginate", { + + dummy_internal <- function(paginator) { + paginator + } + dummy_op <- function(x, NextToken=NULL, MaxKey=NULL) { + op <- dummy_internal(paginator = list( + input_token = "NextToken", + output_token = "NextToken", + limit_key = "MaxKey", + result_key = "Contents" + ) + ) + list(NextToken=NextToken, MaxKey=MaxKey) + } + mock_substitute <- mock2(substitute(dummy_op(x = "hi"))) + + mock_paginate_update_fn <- mock2( + list( + fn = substitute(dummy_op(x = "hi")), + paginator = list( + input_token = "NextToken", + output_token = "NextToken", + limit_key = "MaxKey", + result_key = "Contents" + ) + ) + ) + mock_retry_api_call <- mock2( + list(Contents = list("foo"), NextToken = "token1"), + list(Contents = list("bar"), NextToken = "token2"), + list(Contents = list("zoo"), NextToken = character()) + ) + + expected <- list( + list(Contents = list("foo"), NextToken = "token1"), + list(Contents = list("bar"), NextToken = "token2"), + list(Contents = list("zoo"), NextToken = character()) + ) + + mockery::stub(paginate, "substitute", mock_substitute) + mockery::stub(paginate, "paginate_update_fn", mock_paginate_update_fn) + mockery::stub(paginate, "retry_api_call", mock_retry_api_call) + + actual <- paginate("dummy") + actual_args <- mockery::mock_args(mock_retry_api_call) + + expect_equal(actual_args, list( + list(list(NextToken = NULL, MaxKey = NULL), retries = 5), + list(list(NextToken = "token1", MaxKey = NULL), retries = 5), + list(list(NextToken = "token2", MaxKey = NULL), retries = 5) + )) + expect_equal(actual, expected) +}) + +test_that("check paginate do.call", { + + dummy_internal <- function(paginator) { + paginator + } + dummy_op <- function(x, NextToken=NULL, MaxKey=NULL) { + op <- dummy_internal(paginator = list( + input_token = "NextToken", + output_token = "NextToken", + limit_key = "MaxKey", + result_key = "Contents" + ) + ) + list(NextToken=NextToken, MaxKey=MaxKey) + } + mock_substitute <- mock2(substitute(do.call(dummy_op, list(x = "hi")))) + + mock_paginate_update_fn <- mock2( + list( + fn = substitute(dummy_op(x = "hi")), + paginator = list( + input_token = "NextToken", + output_token = "NextToken", + limit_key = "MaxKey", + result_key = "Contents" + ) + ) + ) + mock_retry_api_call <- mock2( + list(Contents = list("foo"), NextToken = "token1"), + list(Contents = list("bar"), NextToken = "token2"), + list(Contents = list("zoo"), NextToken = character()) + ) + + expected <- list( + list(Contents = list("foo"), NextToken = "token1"), + list(Contents = list("bar"), NextToken = "token2"), + list(Contents = list("zoo"), NextToken = character()) + ) + + mockery::stub(paginate, "substitute", mock_substitute) + mockery::stub(paginate, "paginate_update_fn", mock_paginate_update_fn) + mockery::stub(paginate, "retry_api_call", mock_retry_api_call) + + actual <- paginate("dummy") + actual_args <- mockery::mock_args(mock_retry_api_call) + + expect_equal(actual_args, list( + list(list(NextToken = NULL, MaxKey = NULL), retries = 5), + list(list(NextToken = "token1", MaxKey = NULL), retries = 5), + list(list(NextToken = "token2", MaxKey = NULL), retries = 5) + )) + expect_equal(actual, expected) +}) + +test_that("check paginate restrict MaxItems", { + + dummy_internal <- function(paginator) { + paginator + } + dummy_op <- function(x, NextToken=NULL, MaxKey=NULL) { + op <- dummy_internal(paginator = list( + input_token = "NextToken", + output_token = "NextToken", + limit_key = "MaxKey", + result_key = "Contents" + ) + ) + list(NextToken=NextToken, MaxKey=MaxKey) + } + mock_substitute <- mock2(substitute(do.call(dummy_op, list(x = "hi")))) + + mock_paginate_update_fn <- mock2( + list( + fn = substitute(dummy_op(x = "hi")), + paginator = list( + input_token = "NextToken", + output_token = "NextToken", + limit_key = "MaxKey", + result_key = "Contents" + ) + ) + ) + mock_retry_api_call <- mock2( + list(Contents = list("foo"), NextToken = "token1"), + list(Contents = list("bar"), NextToken = "token2"), + list(Contents = list("zoo"), NextToken = character()) + ) + + expected <- list( + list(Contents = list("foo"), NextToken = "token1"), + list(Contents = list("bar"), NextToken = "token2") + ) + + mockery::stub(paginate, "substitute", mock_substitute) + mockery::stub(paginate, "paginate_update_fn", mock_paginate_update_fn) + mockery::stub(paginate, "retry_api_call", mock_retry_api_call) + + actual <- paginate("dummy", MaxItems = 2) + actual_args <- mockery::mock_args(mock_retry_api_call) + + expect_equal(actual_args, list( + list(list(NextToken = NULL, MaxKey = NULL), retries = 5), + list(list(NextToken = "token1", MaxKey = NULL), retries = 5) + )) + expect_equal(actual, expected) +}) + +######################################################################## +# paginate_xapply +######################################################################## + +test_that("check paginate_xapply", { + + dummy_internal <- function(paginator) { + paginator + } + dummy_op <- function(x, NextToken=NULL, MaxKey=NULL) { + op <- dummy_internal(paginator = list( + input_token = "NextToken", + output_token = "NextToken", + limit_key = "MaxKey", + result_key = "Contents" + ) + ) + list(NextToken=NextToken, MaxKey=MaxKey) + } + + mock_retry_api_call <- mock2( + list(Contents = list("foo"), NextToken = "token1"), + list(Contents = list("bar"), NextToken = "token2"), + list(Contents = list("zoo"), NextToken = character()) + ) + + expected <- list( + list(Contents = list("foo"), NextToken = "token1"), + list(Contents = list("bar"), NextToken = "token2"), + list(Contents = list("zoo"), NextToken = character()) + ) + + mockery::stub(paginate_xapply, "retry_api_call", mock_retry_api_call) + + actual <- paginate_xapply( + substitute(dummy_op(x = "hi")), + paginator = list( + input_token = "NextToken", + output_token = "NextToken", + limit_key = "MaxKey", + result_key = "Contents" + ), + FUN = function(resp) {resp}, + MaxRetries = 5, + MaxItems = NULL + ) + actual_args <- mockery::mock_args(mock_retry_api_call) + + expect_equal(actual_args, list( + list(list(NextToken = NULL, MaxKey = NULL), retries = 5), + list(list(NextToken = "token1", MaxKey = NULL), retries = 5), + list(list(NextToken = "token2", MaxKey = NULL), retries = 5) + )) + expect_equal(actual, expected) +}) + +test_that("check paginate_xapply restrict MaxItems", { + + dummy_internal <- function(paginator) { + paginator + } + dummy_op <- function(x, NextToken=NULL, MaxKey=NULL) { + op <- dummy_internal(paginator = list( + input_token = "NextToken", + output_token = "NextToken", + limit_key = "MaxKey", + result_key = "Contents" + ) + ) + list(NextToken=NextToken, MaxKey=MaxKey) + } + + mock_retry_api_call <- mock2( + list(Contents = list("foo"), NextToken = "token1"), + list(Contents = list("bar"), NextToken = "token2"), + list(Contents = list("zoo"), NextToken = character()) + ) + + expected <- list( + list(Contents = list("foo"), NextToken = "token1"), + list(Contents = list("bar"), NextToken = "token2") + ) + + mockery::stub(paginate_xapply, "retry_api_call", mock_retry_api_call) + + actual <- paginate_xapply( + substitute(dummy_op(x = "hi")), + paginator = list( + input_token = "NextToken", + output_token = "NextToken", + limit_key = "MaxKey", + result_key = "Contents" + ), + FUN = function(resp) {resp}, + MaxRetries = 5, + MaxItems = 2 + ) + actual_args <- mockery::mock_args(mock_retry_api_call) + + expect_equal(actual_args, list( + list(list(NextToken = NULL, MaxKey = NULL), retries = 5), + list(list(NextToken = "token1", MaxKey = NULL), retries = 5) + )) + expect_equal(actual, expected) +}) + + +######################################################################## +# paginate_lapply +######################################################################## +test_that("check paginate_lapply", { + + dummy_internal <- function(paginator) { + paginator + } + dummy_op <- function(x, NextToken=NULL, MaxKey=NULL) { + op <- dummy_internal(paginator = list( + input_token = "NextToken", + output_token = "NextToken", + limit_key = "MaxKey", + result_key = "Contents" + ) + ) + list(NextToken=NextToken, MaxKey=MaxKey) + } + mock_substitute <- mock2(substitute(dummy_op(x = "hi"))) + + mock_paginate_update_fn <- mock2(side_effect = function(fn, ...) { + list( + fn = fn, + paginator = list( + input_token = "NextToken", + output_token = "NextToken", + limit_key = "MaxKey", + result_key = "Contents" + ) + ) + }) + mock_paginate_xapply <- mock2() + + mockery::stub(paginate_lapply, "substitute", mock_substitute) + mockery::stub(paginate_lapply, "paginate_update_fn", mock_paginate_update_fn) + mockery::stub(paginate_lapply, "paginate_xapply", mock_paginate_xapply) + + actual <- paginate_lapply("dummy", \(resp) {resp}) + actual_fn <- mock_arg(mock_paginate_update_fn)[[1]] + + expect_equal(actual_fn, substitute(dummy_op(x = "hi"))) +}) + +test_that("check paginate_lapply do.call modified operation", { + + dummy_internal <- function(paginator) { + paginator + } + dummy_op <- function(x, NextToken=NULL, MaxKey=NULL) { + op <- dummy_internal(paginator = list( + input_token = "NextToken", + output_token = "NextToken", + limit_key = "MaxKey", + result_key = "Contents" + ) + ) + list(NextToken=NextToken, MaxKey=MaxKey) + } + mock_substitute <- mock2(substitute(do.call(dummy_op, list(x = "hi")))) + + mock_paginate_update_fn <- mock2(side_effect = function(fn, ...) { + list( + fn = fn, + paginator = list( + input_token = "NextToken", + output_token = "NextToken", + limit_key = "MaxKey", + result_key = "Contents" + ) + ) + }) + mock_paginate_xapply <- mock2() + + mockery::stub(paginate_lapply, "substitute", mock_substitute) + mockery::stub(paginate_lapply, "paginate_update_fn", mock_paginate_update_fn) + mockery::stub(paginate_lapply, "paginate_xapply", mock_paginate_xapply) + + actual <- paginate_lapply("dummy", \(resp) {resp}) + actual_fn <- mock_arg(mock_paginate_update_fn)[[1]] + + expect_equal(actual_fn, substitute(dummy_op(x = "hi"))) +}) + +######################################################################## +# paginate_sapply +######################################################################## +test_that("check paginate_sapply", { + + dummy_internal <- function(paginator) { + paginator + } + dummy_op <- function(x, NextToken=NULL, MaxKey=NULL) { + op <- dummy_internal(paginator = list( + input_token = "NextToken", + output_token = "NextToken", + limit_key = "MaxKey", + result_key = "Contents" + ) + ) + list(NextToken=NextToken, MaxKey=MaxKey) + } + mock_substitute <- mock2(substitute(dummy_op(x = "hi"))) + + mock_paginate_update_fn <- mock2(side_effect = function(fn, ...) { + list( + fn = fn, + paginator = list( + input_token = "NextToken", + output_token = "NextToken", + limit_key = "MaxKey", + result_key = "Contents" + ) + ) + }) + mock_paginate_xapply <- mock2() + + mockery::stub(paginate_sapply, "substitute", mock_substitute) + mockery::stub(paginate_sapply, "paginate_update_fn", mock_paginate_update_fn) + mockery::stub(paginate_sapply, "paginate_xapply", mock_paginate_xapply) + + actual <- paginate_sapply("dummy", \(resp) {resp}) + actual_fn <- mock_arg(mock_paginate_update_fn)[[1]] + + expect_equal(actual_fn, substitute(dummy_op(x = "hi"))) +}) + +test_that("check paginate_sapply do.call modified operation", { + + dummy_internal <- function(paginator) { + paginator + } + dummy_op <- function(x, NextToken=NULL, MaxKey=NULL) { + op <- dummy_internal(paginator = list( + input_token = "NextToken", + output_token = "NextToken", + limit_key = "MaxKey", + result_key = "Contents" + ) + ) + list(NextToken=NextToken, MaxKey=MaxKey) + } + mock_substitute <- mock2(substitute(do.call(dummy_op, list(x = "hi")))) + + mock_paginate_update_fn <- mock2(side_effect = function(fn, ...) { + list( + fn = fn, + paginator = list( + input_token = "NextToken", + output_token = "NextToken", + limit_key = "MaxKey", + result_key = "Contents" + ) + ) + }) + mock_paginate_xapply <- mock2() + + mockery::stub(paginate_sapply, "substitute", mock_substitute) + mockery::stub(paginate_sapply, "paginate_update_fn", mock_paginate_update_fn) + mockery::stub(paginate_sapply, "paginate_xapply", mock_paginate_xapply) + + actual <- paginate_sapply("dummy", \(resp) {resp}) + actual_fn <- mock_arg(mock_paginate_update_fn)[[1]] + + expect_equal(actual_fn, substitute(dummy_op(x = "hi"))) +})