Skip to content

Commit

Permalink
Added custom retry with backoff for error not covered in BQ client (4…
Browse files Browse the repository at this point in the history
…00, please retry with backoff)
  • Loading branch information
sechegaray committed Oct 24, 2022
1 parent 097fd84 commit 4e7ef71
Showing 1 changed file with 54 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,12 @@
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import java.util.stream.Stream;
import javax.annotation.Nullable;

/**
Expand Down Expand Up @@ -100,9 +104,6 @@ public void run(ActionContext context) throws Exception {
// Enable legacy SQL
builder.setUseLegacySql(config.isLegacySQL());

// Location must match that of the dataset(s) referenced in the query.
JobId jobId = JobId.newBuilder().setRandomJob().setLocation(config.getLocation()).build();

// API request - starts the query.
Credentials credentials = config.getServiceAccount() == null ?
null : GCPUtils.loadServiceAccountCredentials(config.getServiceAccount(),
Expand All @@ -126,13 +127,17 @@ public void run(ActionContext context) throws Exception {

QueryJobConfiguration queryConfig = builder.build();

Job queryJob = bigQuery.create(JobInfo.newBuilder(queryConfig).setJobId(jobId).build());

LOG.info("Executing SQL as job {}.", jobId.getJob());
LOG.debug("The BigQuery SQL is {}", config.getSql());
// Setting external retry strategy for BigQuery client due to BigQuery Client not retrying when a job clashes
// with another job, due to error being 400.

// Wait for the query to complete
queryJob.waitFor();
final String retryableStringPattern = "Retrying the job with back-off";
List<Function<BigQueryException, Boolean>> retryRules = new ArrayList<>();
retryRules.add(
(BigQueryException e) -> !((e.getCode() == 400)
&& (e.getMessage().contains(retryableStringPattern) || e.getReason().contains(retryableStringPattern)))
);
Job queryJob = executeQueryJobWithCustomRetry(bigQuery, queryConfig, retryRules);

// Check for errors
if (queryJob.getStatus().getError() != null) {
Expand Down Expand Up @@ -169,6 +174,47 @@ public void run(ActionContext context) throws Exception {
context.getMetrics().gauge(RECORDS_PROCESSED, rows);
}

/**
* Executes Query with added retry rules following:
* https://cloud.google.com/bigquery/sla
*/
private Job executeQueryJobWithCustomRetry(BigQuery bigQuery, QueryJobConfiguration queryConfig,
List<Function<BigQueryException, Boolean>> retryRules) throws Exception {
// The longest amount of time to wait in-between retries.
final int maximum_backoff = 32;

// The maximum number of retries.
final int max_retries = 20;

int retries = 0;

while (true) {
try {
// Location must match that of the dataset(s) referenced in the query.
JobId jobId = JobId.newBuilder().setRandomJob().setLocation(config.getLocation()).build();
Job queryJob = bigQuery.create(JobInfo.newBuilder(queryConfig).setJobId(jobId).build());
LOG.info("Executing SQL as job {}.", jobId.getJob());
LOG.debug("The BigQuery SQL is {}", config.getSql());

// Wait for the query to complete
queryJob.waitFor();
return queryJob;
} catch (BigQueryException bigQueryException) {
if (retries >= max_retries) {
LOG.error("Run out of retries while executing query with backoff.");
throw bigQueryException;
}
if (retryRules.stream().noneMatch((f -> f.apply(bigQueryException)))) {
throw bigQueryException;
}
LOG.warn("Received {} error from BigQuery, retrying...", bigQueryException.getMessage());
long sleep_time = Math.round((Math.min(Math.pow(2, retries), maximum_backoff) + Math.random()) * 1000);
Thread.sleep(sleep_time);
retries += 1;
}
}
}

@Override
public AbstractBigQueryActionConfig getConfig() {
return config;
Expand Down

0 comments on commit 4e7ef71

Please sign in to comment.