Skip to content

Commit

Permalink
Merge pull request #1488 from cloudsufi/fem/action/GCSBucketCreate
Browse files Browse the repository at this point in the history
[PLUGIN-1842] Error Management catch known errors [GCSBucketCreate]
  • Loading branch information
psainics authored Jan 17, 2025
2 parents a2b9a52 + bdd55f3 commit a9967d5
Show file tree
Hide file tree
Showing 3 changed files with 137 additions and 76 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,27 +16,23 @@

package io.cdap.plugin.gcp.common;

import com.google.api.client.googleapis.json.GoogleJsonResponseException;
import com.google.api.client.http.HttpResponseException;
import com.google.common.base.Strings;
import com.google.common.base.Throwables;
import io.cdap.cdap.api.exception.ErrorCategory;
import io.cdap.cdap.api.exception.ErrorCategory.ErrorCategoryEnum;
import io.cdap.cdap.api.exception.ErrorCodeType;
import io.cdap.cdap.api.exception.ErrorType;
import io.cdap.cdap.api.exception.ErrorUtils;
import io.cdap.cdap.api.exception.ProgramFailureException;
import io.cdap.cdap.etl.api.exception.ErrorContext;
import io.cdap.cdap.etl.api.exception.ErrorDetailsProvider;

import java.io.IOException;
import java.util.List;

/**
* A custom ErrorDetailsProvider for GCP plugins.
*/
public class GCPErrorDetailsProvider implements ErrorDetailsProvider {
private static final String ERROR_MESSAGE_FORMAT = "Error occurred in the phase: '%s'. %s: %s";
static final String ERROR_MESSAGE_FORMAT = "Error occurred in the phase: '%s'. %s: %s";

/**
* Get a ProgramFailureException with the given error
Expand All @@ -53,7 +49,8 @@ public ProgramFailureException getExceptionDetails(Exception e, ErrorContext err
return null;
}
if (t instanceof HttpResponseException) {
return getProgramFailureException((HttpResponseException) t, errorContext);
return GCPErrorDetailsProviderUtil.getProgramFailureException((HttpResponseException) t,
getExternalDocumentationLink(), errorContext);
}
if (t instanceof IllegalArgumentException) {
return getProgramFailureException((IllegalArgumentException) t, errorContext);
Expand All @@ -65,58 +62,6 @@ public ProgramFailureException getExceptionDetails(Exception e, ErrorContext err
return null;
}

/**
* Get a ProgramFailureException with the given error
* information from {@link HttpResponseException}.
*
* @param e The HttpResponseException to get the error information from.
* @return A ProgramFailureException with the given error information.
*/
private ProgramFailureException getProgramFailureException(HttpResponseException e,
ErrorContext errorContext) {
Integer statusCode = e.getStatusCode();
ErrorUtils.ActionErrorPair pair = ErrorUtils.getActionErrorByStatusCode(statusCode);
String errorReason = String.format("%s %s. %s", e.getStatusCode(), e.getStatusMessage(),
pair.getCorrectiveAction());

String errorMessage = e.getMessage();
String externalDocumentationLink = null;
if (e instanceof GoogleJsonResponseException) {
errorMessage = getErrorMessage((GoogleJsonResponseException) e);

externalDocumentationLink = getExternalDocumentationLink();
if (!Strings.isNullOrEmpty(externalDocumentationLink)) {

if (!errorReason.endsWith(".")) {
errorReason = errorReason + ".";
}
errorReason = String.format("%s For more details, see %s", errorReason,
externalDocumentationLink);
}
}

return ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategoryEnum.PLUGIN),
errorReason, String.format(ERROR_MESSAGE_FORMAT, errorContext.getPhase(),
e.getClass().getName(), errorMessage),
pair.getErrorType(), true, ErrorCodeType.HTTP, statusCode.toString(),
externalDocumentationLink, e);
}

private String getErrorMessage(GoogleJsonResponseException exception) {
if (!Strings.isNullOrEmpty(exception.getMessage())) {
return exception.getMessage();
}
if (exception.getDetails() != null) {
try {
return exception.getDetails().toPrettyString();
} catch (IOException e) {
return exception.getDetails().toString();
}
}
return exception.getMessage();
}


/**
* Get a ProgramFailureException with the given error
* information from {@link IllegalArgumentException}.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
/*
* Copyright © 2025 Cask Data, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package io.cdap.plugin.gcp.common;


import com.google.api.client.googleapis.json.GoogleJsonResponseException;
import com.google.api.client.http.HttpResponseException;
import com.google.common.base.Strings;
import com.google.common.base.Throwables;
import io.cdap.cdap.api.exception.ErrorCategory;
import io.cdap.cdap.api.exception.ErrorCodeType;
import io.cdap.cdap.api.exception.ErrorType;
import io.cdap.cdap.api.exception.ErrorUtils;
import io.cdap.cdap.api.exception.ProgramFailureException;
import io.cdap.cdap.etl.api.exception.ErrorContext;

import java.io.IOException;
import java.util.List;
import javax.annotation.Nullable;

/**
* Common functions for GCP error details provider related functionalities.
*/
public final class GCPErrorDetailsProviderUtil {

/**
* Get a ProgramFailureException with the given error
* information from {@link HttpResponseException}.
*
* @param e The HttpResponseException to get the error information from.
* @return A ProgramFailureException with the given error information.
*/
public static ProgramFailureException getProgramFailureException(HttpResponseException e, String externalDocUrl,
@Nullable ErrorContext errorContext) {
Integer statusCode = e.getStatusCode();
ErrorUtils.ActionErrorPair pair = ErrorUtils.getActionErrorByStatusCode(statusCode);
String errorReason = String.format("%s %s. %s", e.getStatusCode(), e.getStatusMessage(),
pair.getCorrectiveAction());
String errorMessage = e.getMessage();
String externalDocumentationLink = null;
if (e instanceof GoogleJsonResponseException) {
errorMessage = getErrorMessage((GoogleJsonResponseException) e);
externalDocumentationLink = externalDocUrl;
if (!Strings.isNullOrEmpty(externalDocumentationLink)) {
if (!errorReason.endsWith(".")) {
errorReason = errorReason + ".";
}
errorReason = String.format("%s For more details, see %s", errorReason, externalDocumentationLink);
}
}
return ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), errorReason,
errorContext != null ?
String.format(GCPErrorDetailsProvider.ERROR_MESSAGE_FORMAT, errorContext.getPhase(), e.getClass().getName(),
errorMessage) : String.format("%s: %s", e.getClass().getName(), errorMessage), pair.getErrorType(), true,
ErrorCodeType.HTTP, statusCode.toString(), externalDocumentationLink, e);
}

public static ProgramFailureException getHttpResponseExceptionDetailsFromChain(Exception e, String errorReason,
ErrorType errorType,
boolean dependency,
String externalDocUrl) {
List<Throwable> causalChain = Throwables.getCausalChain(e);
for (Throwable t : causalChain) {
if (t instanceof ProgramFailureException) {
// Avoid double wrap
return (ProgramFailureException) t;
}
if (t instanceof HttpResponseException) {
return getProgramFailureException((HttpResponseException) t, externalDocUrl, null);
}
}
// If no HttpResponseException found in the causal chain, return generic program failure exception
return ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), errorReason,
String.format("%s %s: %s", errorReason, e.getClass().getName(), e.getMessage()), errorType, dependency, e);
}

private static String getErrorMessage(GoogleJsonResponseException exception) {
if (!Strings.isNullOrEmpty(exception.getMessage())) {
return exception.getMessage();
}
if (exception.getDetails() != null) {
try {
return exception.getDetails().toPrettyString();
} catch (IOException e) {
return exception.getDetails().toString();
}
}
return exception.getMessage();
}
}
48 changes: 30 additions & 18 deletions src/main/java/io/cdap/plugin/gcp/gcs/actions/GCSBucketCreate.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,29 +16,26 @@

package io.cdap.plugin.gcp.gcs.actions;

import com.google.api.pathtemplate.ValidationException;
import com.google.auth.Credentials;
import com.google.auth.oauth2.ServiceAccountCredentials;
import com.google.cloud.kms.v1.CryptoKeyName;
import com.google.cloud.storage.Bucket;
import com.google.cloud.storage.Storage;
import com.google.cloud.storage.StorageException;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Strings;
import io.cdap.cdap.api.annotation.Description;
import io.cdap.cdap.api.annotation.Macro;
import io.cdap.cdap.api.annotation.Name;
import io.cdap.cdap.api.annotation.Plugin;
import io.cdap.cdap.etl.api.Arguments;
import io.cdap.cdap.api.exception.ErrorCategory;
import io.cdap.cdap.api.exception.ErrorType;
import io.cdap.cdap.api.exception.ErrorUtils;
import io.cdap.cdap.etl.api.FailureCollector;
import io.cdap.cdap.etl.api.PipelineConfigurer;
import io.cdap.cdap.etl.api.action.Action;
import io.cdap.cdap.etl.api.action.ActionContext;
import io.cdap.plugin.gcp.common.CmekUtils;
import io.cdap.plugin.gcp.common.GCPConfig;
import io.cdap.plugin.gcp.common.GCPErrorDetailsProviderUtil;
import io.cdap.plugin.gcp.common.GCPUtils;
import io.cdap.plugin.gcp.gcs.GCSPath;
import io.cdap.plugin.gcp.gcs.sink.GCSBatchSink;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
Expand Down Expand Up @@ -86,9 +83,16 @@ public void run(ActionContext context) throws Exception {
return;
}
String serviceAccount = config.getServiceAccount();
Credentials credentials = serviceAccount == null ?
null : GCPUtils.loadServiceAccountCredentials(serviceAccount, isServiceAccountFilePath);

Credentials credentials = null;
try {
credentials = serviceAccount == null ? null : GCPUtils.loadServiceAccountCredentials(serviceAccount,
isServiceAccountFilePath);
} catch (IOException e) {
String errorReason = "Failed to load service account credentials.";
collector.addFailure(String.format("%s %s: %s", errorReason, e.getClass().getName(), e.getMessage()), null)
.withStacktrace(e.getStackTrace());
collector.getOrThrowException();
}
Map<String, String> map = GCPUtils.generateGCSAuthProperties(serviceAccount, config.getServiceAccountType());
map.forEach(configuration::set);

Expand Down Expand Up @@ -125,19 +129,21 @@ public void run(ActionContext context) throws Exception {
Bucket bucket = null;
try {
bucket = storage.get(gcsPath.getBucket());
} catch (StorageException e) {
} catch (Exception e) {
// Add more descriptive error message
throw new RuntimeException(
String.format("Unable to access or create bucket %s. ", gcsPath.getBucket())
+ "Ensure you entered the correct bucket path and have permissions for it.", e);
String errorReason = String.format("Unable to access GCS bucket '%s'", gcsPath.getBucket());
throw GCPErrorDetailsProviderUtil.getHttpResponseExceptionDetailsFromChain(e, errorReason, ErrorType.UNKNOWN,
true, GCPUtils.GCS_SUPPORTED_DOC_URL);
}
if (bucket == null) {
GCPUtils.createBucket(storage, gcsPath.getBucket(), config.location, cmekKeyName);
undoBucket.add(bucketPath);
} else if (gcsPath.equals(bucketPath) && config.failIfExists()) {
// if the gcs path is just a bucket, and it exists, fail the pipeline
rollback = true;
throw new Exception(String.format("Path %s already exists", gcsPath));
String errorReason = String.format("Path %s already exists", gcsPath);
throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN),
errorReason, errorReason, ErrorType.USER, true, null);
}
}

Expand All @@ -146,7 +152,9 @@ public void run(ActionContext context) throws Exception {
fs = gcsPath.getFileSystem(configuration);
} catch (IOException e) {
rollback = true;
throw new Exception("Unable to get GCS filesystem handler. " + e.getMessage(), e);
String errorReason = "Unable to get GCS filesystem handler.";
throw GCPErrorDetailsProviderUtil.getHttpResponseExceptionDetailsFromChain(e, errorReason, ErrorType.UNKNOWN,
true, GCPUtils.GCS_SUPPORTED_DOC_URL);
}
if (!fs.exists(gcsPath)) {
try {
Expand All @@ -156,12 +164,16 @@ public void run(ActionContext context) throws Exception {
} catch (IOException e) {
LOG.warn(String.format("Failed to create path '%s'", gcsPath));
rollback = true;
throw e;
String errorReason = String.format("Failed to create path %s.", gcsPath);
throw GCPErrorDetailsProviderUtil.getHttpResponseExceptionDetailsFromChain(e, errorReason,
ErrorType.UNKNOWN, true, GCPUtils.GCS_SUPPORTED_DOC_URL);
}
} else {
if (config.failIfExists()) {
rollback = true;
throw new Exception(String.format("Path %s already exists", gcsPath));
String errorReason = String.format("Path %s already exists", gcsPath);
throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN),
errorReason, errorReason, ErrorType.SYSTEM, true, null);
}
}
}
Expand Down

0 comments on commit a9967d5

Please sign in to comment.