Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added custom retry with backoff for error not covered in BQ client (4… #1174

Draft
wants to merge 1 commit into
base: develop
Choose a base branch
from
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,11 @@
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import javax.annotation.Nullable;

/**
Expand Down Expand Up @@ -100,9 +103,6 @@ public void run(ActionContext context) throws Exception {
// Enable legacy SQL
builder.setUseLegacySql(config.isLegacySQL());

// Location must match that of the dataset(s) referenced in the query.
JobId jobId = JobId.newBuilder().setRandomJob().setLocation(config.getLocation()).build();

// API request - starts the query.
Credentials credentials = config.getServiceAccount() == null ?
null : GCPUtils.loadServiceAccountCredentials(config.getServiceAccount(),
Expand All @@ -126,13 +126,17 @@ public void run(ActionContext context) throws Exception {

QueryJobConfiguration queryConfig = builder.build();

Job queryJob = bigQuery.create(JobInfo.newBuilder(queryConfig).setJobId(jobId).build());

LOG.info("Executing SQL as job {}.", jobId.getJob());
LOG.debug("The BigQuery SQL is {}", config.getSql());
// Setting external retry strategy for BigQuery client due to BigQuery Client not retrying when a job clashes
// with another job, due to error being 400.

// Wait for the query to complete
queryJob.waitFor();
final String retryableStringPattern = "Retrying the job with back-off";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: make this a constant

List<Function<BigQueryException, Boolean>> retryRules = new ArrayList<>();
retryRules.add(
(BigQueryException e) -> e.getCode() == 400
&& (e.getMessage().contains(retryableStringPattern) || e.getReason().contains(retryableStringPattern))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are the methods on BigQueryException like getMessage() and getReason() guaranteed to be non null?

);
Job queryJob = executeQueryJobWithCustomRetry(bigQuery, queryConfig, retryRules);

// Check for errors
if (queryJob.getStatus().getError() != null) {
Expand Down Expand Up @@ -169,6 +173,46 @@ public void run(ActionContext context) throws Exception {
context.getMetrics().gauge(RECORDS_PROCESSED, rows);
}

/**
* Executes Query with added retry rules following:
* https://cloud.google.com/bigquery/sla
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should also apply to BigQuery Sink plugin and may be even replication BQ plugins? Or for the time being it is only done for execute? It would be good to put this in a different class if it is applicable for other plugins.

*/
private Job executeQueryJobWithCustomRetry(BigQuery bigQuery, QueryJobConfiguration queryConfig,
List<Function<BigQueryException, Boolean>> retryRules) throws Exception {
// The longest amount of time to wait in-between retries.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: mention the timeunit

final int maximumBackoff = 32;

// The maximum number of retries.
final int maxRetries = 20;

int retries = 0;

while (true) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add a unit test.

try {
// Location must match that of the dataset(s) referenced in the query.
JobId jobId = JobId.newBuilder().setRandomJob().setLocation(config.getLocation()).build();
Job queryJob = bigQuery.create(JobInfo.newBuilder(queryConfig).setJobId(jobId).build());
LOG.info("Executing SQL as job {}.", jobId.getJob());
LOG.debug("The BigQuery SQL is {}", config.getSql());

// Wait for the query to complete
queryJob.waitFor();
return queryJob;
} catch (BigQueryException bigQueryException) {
if (retries >= maxRetries) {
LOG.error("Run out of retries while executing query with backoff.");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Ran out of...

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

throw bigQueryException;
}
if (retryRules.stream().noneMatch((f -> f.apply(bigQueryException)))) {
throw bigQueryException;
}
LOG.warn("Received {} error from BigQuery, retrying...", bigQueryException.getMessage());
Thread.sleep(Math.round((Math.min(Math.pow(2, retries), maximumBackoff) + Math.random()) * 1000));
retries += 1;
}
}
}

@Override
public AbstractBigQueryActionConfig getConfig() {
return config;
Expand Down