diff --git a/pom.xml b/pom.xml
index 691125d..6427907 100644
--- a/pom.xml
+++ b/pom.xml
@@ -45,6 +45,7 @@
27.0.1-jre
2.6.0
1.10
+ 3.3.2
@@ -60,6 +61,11 @@
+
+ dev.failsafe
+ failsafe
+ ${failsafe.version}
+
com.google.guava
guava
diff --git a/src/main/java/io/cdap/plugin/ariba/source/AribaBatchSource.java b/src/main/java/io/cdap/plugin/ariba/source/AribaBatchSource.java
index d628ad2..b706c55 100644
--- a/src/main/java/io/cdap/plugin/ariba/source/AribaBatchSource.java
+++ b/src/main/java/io/cdap/plugin/ariba/source/AribaBatchSource.java
@@ -32,7 +32,6 @@
import io.cdap.cdap.etl.api.batch.BatchSourceContext;
import io.cdap.cdap.etl.api.connector.Connector;
import io.cdap.plugin.ariba.source.config.AribaPluginConfig;
-import io.cdap.plugin.ariba.source.connector.AribaConnector;
import io.cdap.plugin.ariba.source.exception.AribaException;
import io.cdap.plugin.ariba.source.util.AribaUtil;
import io.cdap.plugin.ariba.source.util.ResourceConstants;
@@ -69,7 +68,12 @@ public class AribaBatchSource extends BatchSource getSplits(JobContext jobContext) throws IOException {
AribaPluginConfig pluginConfig = getPluginConfig(jobContext);
- AribaServices aribaServices = new AribaServices(pluginConfig.getConnection());
+ AribaServices aribaServices = new AribaServices(pluginConfig.getConnection(),
+ pluginConfig.getMaxRetryCount(),
+ pluginConfig.getInitialRetryDuration(),
+ pluginConfig.getMaxRetryDuration(),
+ pluginConfig.getRetryMultiplier(),
+ true);
boolean previewEnabled = Boolean.parseBoolean(jobContext.getConfiguration().
get(ResourceConstants.IS_PREVIEW_ENABLED));
@@ -66,7 +71,12 @@ public List getSplits(JobContext jobContext) throws IOException {
public RecordReader createRecordReader(InputSplit inputSplit, TaskAttemptContext
taskAttemptContext) throws IOException {
AribaPluginConfig pluginConfig = getPluginConfig(taskAttemptContext);
- AribaServices aribaServices = new AribaServices(pluginConfig.getConnection());
+ AribaServices aribaServices = new AribaServices(pluginConfig.getConnection(),
+ pluginConfig.getMaxRetryCount(),
+ pluginConfig.getInitialRetryDuration(),
+ pluginConfig.getMaxRetryDuration(),
+ pluginConfig.getRetryMultiplier(),
+ true);
Schema outputSchema = Schema.parseJson(taskAttemptContext.getConfiguration().get(ResourceConstants.OUTPUT_SCHEMA));
return new AribaRecordReader(aribaServices, outputSchema, pluginConfig);
}
diff --git a/src/main/java/io/cdap/plugin/ariba/source/AribaServices.java b/src/main/java/io/cdap/plugin/ariba/source/AribaServices.java
index f089e7c..f649247 100644
--- a/src/main/java/io/cdap/plugin/ariba/source/AribaServices.java
+++ b/src/main/java/io/cdap/plugin/ariba/source/AribaServices.java
@@ -21,10 +21,14 @@
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.google.common.annotations.VisibleForTesting;
import com.google.gson.Gson;
+import dev.failsafe.Failsafe;
+import dev.failsafe.FailsafeException;
+import dev.failsafe.RetryPolicy;
import io.cdap.cdap.api.data.schema.Schema;
import io.cdap.plugin.ariba.source.config.AribaPluginConfig;
import io.cdap.plugin.ariba.source.connector.AribaConnectorConfig;
import io.cdap.plugin.ariba.source.exception.AribaException;
+import io.cdap.plugin.ariba.source.exception.AribaRetryableException;
import io.cdap.plugin.ariba.source.metadata.AribaColumnMetadata;
import io.cdap.plugin.ariba.source.metadata.AribaResponseContainer;
import io.cdap.plugin.ariba.source.metadata.AribaSchemaGenerator;
@@ -57,8 +61,6 @@
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
import java.util.zip.ZipInputStream;
import javax.annotation.Nullable;
import javax.ws.rs.core.MediaType;
@@ -100,18 +102,36 @@ public class AribaServices {
private static final String TYPE = "type";
private static final String UTC = "UTC";
private static final Logger LOG = LoggerFactory.getLogger(AribaServices.class);
- private static final int MAX_RETRIES = 5;
private final AribaConnectorConfig pluginConfig;
private final ObjectMapper objectMapper = new ObjectMapper();
private final Gson gson = new Gson();
private int availableLimit;
- private boolean isDayLimitExhausted;
- private boolean isHourLimitExhausted;
- private boolean isMinuteLimitExhausted;
- private boolean isSecondsLimitExhausted;
+ boolean isDayLimitExhausted;
+ boolean isHourLimitExhausted;
+ boolean isMinuteLimitExhausted;
+ boolean isSecondsLimitExhausted;
- public AribaServices(AribaConnectorConfig pluginConfig) {
+ private final Integer initialRetryDuration;
+ private final Integer maxRetryDuration;
+ private final Integer maxRetryCount;
+ private final Integer retryMultiplier;
+
+ /**
+ * Determines if retry is required for the service call.
+ * If true, then the service call will be retried based on the retry configuration.
+ * If false, then the service call will not be retried.
+ */
+ private final boolean retryRequired;
+
+ public AribaServices(AribaConnectorConfig pluginConfig, Integer maxRetryCount,
+ Integer initialRetryDuration, Integer maxRetryDuration, Integer retryMultiplier,
+ boolean retryRequired) {
this.pluginConfig = pluginConfig;
+ this.maxRetryCount = maxRetryCount;
+ this.initialRetryDuration = initialRetryDuration;
+ this.maxRetryDuration = maxRetryDuration;
+ this.retryMultiplier = retryMultiplier;
+ this.retryRequired = retryRequired;
}
/**
@@ -366,16 +386,7 @@ public List getMetadata(String accessToken, String template
public JsonNode createJob(AribaPluginConfig aribaPluginConfig, @Nullable String pageToken, String templateName)
throws AribaException, IOException, InterruptedException {
Request req = buildJobRequest(jobBuilder(pageToken).build().url(), aribaPluginConfig, templateName);
- int count = 0;
- Response response;
- do {
- response = executeRequest(req);
- if (response.code() == ResourceConstants.API_LIMIT_EXCEED) {
- checkAndThrowException(response);
- count++;
- }
- } while (response.code() == ResourceConstants.API_LIMIT_EXCEED && count <= MAX_RETRIES);
-
+ Response response = executeRequest(req);
AribaResponseContainer responseContainer = tokenResponse(response);
InputStream responseStream = responseContainer.getResponseBody();
if (responseContainer.getHttpStatusCode() == HttpURLConnection.HTTP_OK) {
@@ -394,17 +405,11 @@ public JsonNode createJob(AribaPluginConfig aribaPluginConfig, @Nullable String
public JsonNode fetchJobStatus(String accessToken, String jobId)
throws IOException, AribaException, InterruptedException {
URL url = fetchDataBuilder(jobId).build().url();
- int count = 0;
Request req = buildFetchRequest(url, accessToken);
Response response = null;
try {
- do {
- response = executeRequest(req);
- checkAndThrowException(response);
- count++;
- } while (response.code() == ResourceConstants.API_LIMIT_EXCEED && count <= MAX_RETRIES);
+ response = executeRequest(req);
AribaResponseContainer responseContainer = tokenResponse(response);
-
if (responseContainer.getHttpStatusCode() == HttpURLConnection.HTTP_OK) {
InputStream responseStream = responseContainer.getResponseBody();
JsonNode responseNode = objectMapper.readTree(responseStream);
@@ -418,7 +423,9 @@ public JsonNode fetchJobStatus(String accessToken, String jobId)
}
throw new AribaException(response.message(), response.code());
} finally {
- response.close();
+ if (response != null) {
+ response.close();
+ }
}
}
@@ -621,6 +628,8 @@ protected Request buildFetchRequest(URL endPoint, String accessToken) {
}
/**
+ * Executes the given Ariba request and returns the response.
+ *
* @param req request
* @return Response
* @throws AribaException AribaException
@@ -628,52 +637,65 @@ protected Request buildFetchRequest(URL endPoint, String accessToken) {
* @throws IOException IOException
*/
public Response executeRequest(Request req) throws AribaException, InterruptedException, IOException {
- OkHttpClient enhancedOkHttpClient = getConfiguredClient().build();
- Response response = null;
+ int actualMaxRetryCount = retryRequired ? maxRetryCount : 0;
+ RetryPolicy