Skip to content

Commit

Permalink
remove try-block from GCPUtils#getStorage
Browse files Browse the repository at this point in the history
  • Loading branch information
itsankit-google committed Jan 16, 2025
1 parent 6144535 commit 0ca5af8
Show file tree
Hide file tree
Showing 5 changed files with 5 additions and 51 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@
import io.cdap.plugin.gcp.bigquery.util.BigQueryUtil;
import io.cdap.plugin.gcp.common.CmekUtils;
import io.cdap.plugin.gcp.common.GCPUtils;
import io.cdap.plugin.gcp.gcs.GCSErrorDetailsProvider;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.NullWritable;
import org.slf4j.Logger;
Expand Down Expand Up @@ -112,14 +111,7 @@ public final void prepareRun(BatchSinkContext context) throws Exception {
}

// Get the required bucket name and bucket instance (if it exists)
Storage storage;
try {
storage = GCPUtils.getStorage(project, credentials);;
} catch (Exception e) {
ProgramFailureException ex = new GCSErrorDetailsProvider().getExceptionDetails(e,
new ErrorContext(ErrorPhase.WRITING));
throw ex == null ? e : ex;
}
Storage storage = GCPUtils.getStorage(project, credentials);;
String bucketName = BigQueryUtil.getStagingBucketName(context.getArguments().asMap(), config.getLocation(),
dataset, config.getBucket());
bucketName = BigQuerySinkUtils.configureBucket(baseConfiguration, bucketName, runUUID.toString());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,6 @@
import io.cdap.plugin.gcp.bigquery.util.BigQueryUtil;
import io.cdap.plugin.gcp.common.CmekUtils;
import io.cdap.plugin.gcp.common.GCPUtils;
import io.cdap.plugin.gcp.gcs.GCSErrorDetailsProvider;
import org.apache.avro.generic.GenericData;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.LongWritable;
Expand Down Expand Up @@ -180,14 +179,7 @@ public void prepareRun(BatchSourceContext context) throws Exception {
dataset, config.getBucket());

// Configure GCS Bucket to use
Storage storage;
try {
storage = GCPUtils.getStorage(config.getProject(), credentials);;
} catch (Exception e) {
ProgramFailureException ex = new GCSErrorDetailsProvider().getExceptionDetails(e,
new ErrorContext(ErrorPhase.READING));
throw ex == null ? e : ex;
}
Storage storage = GCPUtils.getStorage(config.getProject(), credentials);;
String bucket = null;
try {
bucket = BigQuerySourceUtils.getOrCreateBucket(configuration, storage, bucketName, dataset,
Expand Down
12 changes: 1 addition & 11 deletions src/main/java/io/cdap/plugin/gcp/gcs/sink/GCSBatchSink.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,17 +32,14 @@
import io.cdap.cdap.api.annotation.Name;
import io.cdap.cdap.api.annotation.Plugin;
import io.cdap.cdap.api.data.schema.Schema;
import io.cdap.cdap.api.exception.ProgramFailureException;
import io.cdap.cdap.api.plugin.PluginConfig;
import io.cdap.cdap.etl.api.FailureCollector;
import io.cdap.cdap.etl.api.PipelineConfigurer;
import io.cdap.cdap.etl.api.StageMetrics;
import io.cdap.cdap.etl.api.batch.BatchSink;
import io.cdap.cdap.etl.api.batch.BatchSinkContext;
import io.cdap.cdap.etl.api.connector.Connector;
import io.cdap.cdap.etl.api.exception.ErrorContext;
import io.cdap.cdap.etl.api.exception.ErrorDetailsProviderSpec;
import io.cdap.cdap.etl.api.exception.ErrorPhase;
import io.cdap.cdap.etl.api.validation.ValidatingOutputFormat;
import io.cdap.plugin.common.Asset;
import io.cdap.plugin.common.ConfigUtil;
Expand Down Expand Up @@ -141,14 +138,7 @@ public void prepareRun(BatchSinkContext context) throws Exception {
}

String bucketName = config.getBucket(collector);
Storage storage;
try {
storage = GCPUtils.getStorage(config.connection.getProject(), credentials);
} catch (Exception e) {
ProgramFailureException ex = new GCSErrorDetailsProvider().getExceptionDetails(e,
new ErrorContext(ErrorPhase.READING));
throw ex == null ? e : ex;
}
Storage storage = GCPUtils.getStorage(config.connection.getProject(), credentials);
Bucket bucket;
String location = null;
try {
Expand Down
12 changes: 1 addition & 11 deletions src/main/java/io/cdap/plugin/gcp/gcs/sink/GCSMultiBatchSink.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
import io.cdap.cdap.api.data.format.StructuredRecord;
import io.cdap.cdap.api.data.schema.Schema;
import io.cdap.cdap.api.dataset.lib.KeyValue;
import io.cdap.cdap.api.exception.ProgramFailureException;
import io.cdap.cdap.api.plugin.InvalidPluginConfigException;
import io.cdap.cdap.api.plugin.InvalidPluginProperty;
import io.cdap.cdap.api.plugin.PluginProperties;
Expand All @@ -42,9 +41,7 @@
import io.cdap.cdap.etl.api.batch.BatchSink;
import io.cdap.cdap.etl.api.batch.BatchSinkContext;
import io.cdap.cdap.etl.api.connector.Connector;
import io.cdap.cdap.etl.api.exception.ErrorContext;
import io.cdap.cdap.etl.api.exception.ErrorDetailsProviderSpec;
import io.cdap.cdap.etl.api.exception.ErrorPhase;
import io.cdap.cdap.etl.api.validation.ValidatingOutputFormat;
import io.cdap.plugin.common.batch.sink.SinkOutputFormatProvider;
import io.cdap.plugin.format.FileFormat;
Expand Down Expand Up @@ -159,14 +156,7 @@ public void prepareRun(BatchSinkContext context) throws Exception {
}

String bucketName = config.getBucket(collector);
Storage storage;
try {
storage = GCPUtils.getStorage(config.connection.getProject(), credentials);
} catch (Exception e) {
ProgramFailureException ex = new GCSErrorDetailsProvider().getExceptionDetails(e,
new ErrorContext(ErrorPhase.READING));
throw ex == null ? e : ex;
}
Storage storage = GCPUtils.getStorage(config.connection.getProject(), credentials);
try {
if (storage.get(bucketName) == null) {
GCPUtils.createBucket(storage, bucketName, config.getLocation(), cmekKeyName);
Expand Down
12 changes: 1 addition & 11 deletions src/main/java/io/cdap/plugin/gcp/gcs/source/GCSSource.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,12 @@
import io.cdap.cdap.api.annotation.MetadataProperty;
import io.cdap.cdap.api.annotation.Name;
import io.cdap.cdap.api.annotation.Plugin;
import io.cdap.cdap.api.exception.ProgramFailureException;
import io.cdap.cdap.etl.api.FailureCollector;
import io.cdap.cdap.etl.api.PipelineConfigurer;
import io.cdap.cdap.etl.api.batch.BatchSource;
import io.cdap.cdap.etl.api.batch.BatchSourceContext;
import io.cdap.cdap.etl.api.connector.Connector;
import io.cdap.cdap.etl.api.exception.ErrorContext;
import io.cdap.cdap.etl.api.exception.ErrorDetailsProviderSpec;
import io.cdap.cdap.etl.api.exception.ErrorPhase;
import io.cdap.plugin.common.Asset;
import io.cdap.plugin.common.ConfigUtil;
import io.cdap.plugin.common.LineageRecorder;
Expand Down Expand Up @@ -121,14 +118,7 @@ public void prepareRun(BatchSourceContext context) throws Exception {
collector.getOrThrowException();
}

Storage storage;
try {
storage = GCPUtils.getStorage(config.connection.getProject(), credentials);
} catch (Exception e) {
ProgramFailureException ex = new GCSErrorDetailsProvider().getExceptionDetails(e,
new ErrorContext(ErrorPhase.READING));
throw ex == null ? e : ex;
}
Storage storage = GCPUtils.getStorage(config.connection.getProject(), credentials);
String location = null;
try {
// Get location of the source for lineage
Expand Down

0 comments on commit 0ca5af8

Please sign in to comment.