From a60783bc696a9500958087ccb229d301f933021e Mon Sep 17 00:00:00 2001 From: psainics Date: Tue, 21 May 2024 07:14:41 +0530 Subject: [PATCH] Add wildcard support for copy and move action --- docs/GCSCopy-action.md | 1 + docs/GCSMove-action.md | 1 + .../io/cdap/plugin/gcp/gcs/StorageClient.java | 44 ++++++++++++++++++- .../cdap/plugin/gcp/gcs/actions/GCSCopy.java | 16 +++++-- .../cdap/plugin/gcp/gcs/actions/GCSMove.java | 15 ++++++- .../gcp/gcs/actions/SourceDestConfig.java | 8 ++++ .../plugin/gcp/gcs/StorageClientTest.java | 42 +++++++++++++++++- 7 files changed, 120 insertions(+), 7 deletions(-) diff --git a/docs/GCSCopy-action.md b/docs/GCSCopy-action.md index 09c60cf68b..b90cab7d69 100644 --- a/docs/GCSCopy-action.md +++ b/docs/GCSCopy-action.md @@ -25,6 +25,7 @@ Properties It can be found on the Dashboard in the Google Cloud Platform Console. **Source Path**: Path to a source object or directory. +> Use `*` to copy multiple files. For example, `gs://demo0/prod/reports/*.csv` will copy all CSV files in the `reports` directory. **Destination Path**: Path to the destination. The bucket will be created if it does not exist. diff --git a/docs/GCSMove-action.md b/docs/GCSMove-action.md index 4914d2af11..77c259af31 100644 --- a/docs/GCSMove-action.md +++ b/docs/GCSMove-action.md @@ -26,6 +26,7 @@ Properties It can be found on the Dashboard in the Google Cloud Platform Console. **Source Path**: Path to a source object or directory. +> Use `*` to move multiple files. For example, `gs://demo0/prod/reports/*.csv` will move all CSV files in the `reports` directory. **Destination Path**: Path to the destination. The bucket will be created if it does not exist. diff --git a/src/main/java/io/cdap/plugin/gcp/gcs/StorageClient.java b/src/main/java/io/cdap/plugin/gcp/gcs/StorageClient.java index aa0d9ef008..e124a4e737 100644 --- a/src/main/java/io/cdap/plugin/gcp/gcs/StorageClient.java +++ b/src/main/java/io/cdap/plugin/gcp/gcs/StorageClient.java @@ -34,11 +34,17 @@ import org.slf4j.LoggerFactory; import java.io.IOException; +import java.nio.file.FileSystems; +import java.nio.file.PathMatcher; +import java.nio.file.Paths; import java.util.ArrayList; +import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.function.Consumer; +import java.util.regex.Pattern; import javax.annotation.Nullable; /** @@ -111,7 +117,7 @@ public void mapMetaDataForAllBlobs(String path, Consumer> fu /** * Creates the given bucket if it does not exists. * - * @param path the path of the bucket + * @param path the path of the bucket * @param location the location of bucket * @param cmekKeyName the name of the cmek key */ @@ -163,6 +169,42 @@ public void move(GCSPath sourcePath, GCSPath destPath, boolean recursive, boolea pairTraverse(sourcePath, destPath, recursive, overwrite, BlobPair::move); } + /** + * Get all the matching wildcard paths given the regex input. + */ + public List getMatchedPaths(GCSPath sourcePath, boolean recursive, Pattern wildcardRegex) { + Page blobPage = storage.list(sourcePath.getBucket(), Storage.BlobListOption.prefix( + getWildcardPathPrefix(sourcePath, wildcardRegex) + )); + List blobPageNames = new ArrayList<>(); + blobPage.getValues().forEach(blob -> blobPageNames.add(blob.getName())); + return getFilterMatchedPaths(sourcePath, blobPageNames, recursive); + } + + static String getWildcardPathPrefix(GCSPath sourcePath, Pattern wildcardRegex) { + String pattern = sourcePath.getName(); + String[] patternSplits = pattern.split(wildcardRegex.pattern()); + // prefix may be empty + return patternSplits.length >= 1 ? patternSplits[0] : ""; + } + + static List getFilterMatchedPaths(GCSPath sourcePath, List blobPageNames, boolean recursive) { + Set matchedPaths = new HashSet<>(); + String globPattern = "glob:" + sourcePath.getName(); + PathMatcher matcher = FileSystems.getDefault().getPathMatcher(globPattern); + for (String blobName : blobPageNames) { + if (matcher.matches(Paths.get(blobName))) { + LOG.debug("Blob name {} matches the glob pattern {}", blobName, globPattern); + String gcsPath = String.format("gs://%s/%s", sourcePath.getBucket(), blobName); + matchedPaths.add(GCSPath.from(gcsPath)); + } + } + if (!recursive) { + matchedPaths.removeIf(path -> path.getName().endsWith("/")); + } + return new ArrayList<>(matchedPaths); + } + /** * Gets source and destination pairs by traversing the source path. Consumes each pair after the directory structure * is completely traversed. diff --git a/src/main/java/io/cdap/plugin/gcp/gcs/actions/GCSCopy.java b/src/main/java/io/cdap/plugin/gcp/gcs/actions/GCSCopy.java index 25a984816c..97f34f6c81 100644 --- a/src/main/java/io/cdap/plugin/gcp/gcs/actions/GCSCopy.java +++ b/src/main/java/io/cdap/plugin/gcp/gcs/actions/GCSCopy.java @@ -35,7 +35,9 @@ import io.cdap.plugin.gcp.gcs.StorageClient; import java.io.IOException; +import java.util.ArrayList; import java.util.Collections; +import java.util.List; import javax.annotation.Nullable; @@ -76,9 +78,17 @@ public void run(ActionContext context) throws IOException { // create the destination bucket if not exist storageClient.createBucketIfNotExists(destPath, config.location, cmekKeyName); - //noinspection ConstantConditions - storageClient.copy(config.getSourcePath(), config.getDestPath(), config.recursive, config.shouldOverwrite()); - + List matchedPaths = new ArrayList<>(); + if (SourceDestConfig.WILDCARD_REGEX.matcher(config.getSourcePath().getName()).find()) { + matchedPaths = storageClient.getMatchedPaths(config.getSourcePath(), config.recursive, + SourceDestConfig.WILDCARD_REGEX); + } else { + matchedPaths.add(config.getSourcePath()); + } + for (GCSPath sourcePath : matchedPaths) { + //noinspection ConstantConditions + storageClient.copy(sourcePath, config.getDestPath(), config.recursive, config.shouldOverwrite()); + } } /** diff --git a/src/main/java/io/cdap/plugin/gcp/gcs/actions/GCSMove.java b/src/main/java/io/cdap/plugin/gcp/gcs/actions/GCSMove.java index 12cf0f9a0a..3602c33f13 100644 --- a/src/main/java/io/cdap/plugin/gcp/gcs/actions/GCSMove.java +++ b/src/main/java/io/cdap/plugin/gcp/gcs/actions/GCSMove.java @@ -35,7 +35,9 @@ import io.cdap.plugin.gcp.gcs.StorageClient; import java.io.IOException; +import java.util.ArrayList; import java.util.Collections; +import java.util.List; import javax.annotation.Nullable; @@ -75,8 +77,17 @@ public void run(ActionContext context) throws IOException { // create the destination bucket if not exist storageClient.createBucketIfNotExists(destPath, config.location, cmekKeyName); - //noinspection ConstantConditions - storageClient.move(config.getSourcePath(), config.getDestPath(), config.recursive, config.shouldOverwrite()); + List matchedPaths = new ArrayList<>(); + if (SourceDestConfig.WILDCARD_REGEX.matcher(config.getSourcePath().getName()).find()) { + matchedPaths = storageClient.getMatchedPaths(config.getSourcePath(), config.recursive, + SourceDestConfig.WILDCARD_REGEX); + } else { + matchedPaths.add(config.getSourcePath()); + } + for (GCSPath sourcePath : matchedPaths) { + //noinspection ConstantConditions + storageClient.move(sourcePath, config.getDestPath(), config.recursive, config.shouldOverwrite()); + } } /** diff --git a/src/main/java/io/cdap/plugin/gcp/gcs/actions/SourceDestConfig.java b/src/main/java/io/cdap/plugin/gcp/gcs/actions/SourceDestConfig.java index 5e304988a6..e50ea84dd5 100644 --- a/src/main/java/io/cdap/plugin/gcp/gcs/actions/SourceDestConfig.java +++ b/src/main/java/io/cdap/plugin/gcp/gcs/actions/SourceDestConfig.java @@ -33,7 +33,9 @@ import io.cdap.plugin.gcp.gcs.GCSPath; import java.util.Collections; +import java.util.List; import java.util.Map; +import java.util.regex.Pattern; import javax.annotation.Nullable; /** @@ -44,6 +46,7 @@ public class SourceDestConfig extends GCPConfig { public static final String NAME_DEST_PATH = "destPath"; public static final String NAME_LOCATION = "location"; public static final String READ_TIMEOUT = "readTimeout"; + public static final Pattern WILDCARD_REGEX = Pattern.compile("[*]"); @Name(NAME_SOURCE_PATH) @Macro @@ -132,6 +135,11 @@ public void validate(FailureCollector collector, Map arguments) } catch (IllegalArgumentException e) { collector.addFailure(e.getMessage(), null).withConfigProperty(NAME_DEST_PATH); } + if (WILDCARD_REGEX.matcher(destPath).find()) { + collector.addFailure("Destination path should not contain wildcard characters.", + "Please remove the wildcard characters from the destination path.") + .withConfigProperty(NAME_DEST_PATH); + } } if (!containsMacro(NAME_CMEK_KEY)) { validateCmekKey(collector, arguments); diff --git a/src/test/java/io/cdap/plugin/gcp/gcs/StorageClientTest.java b/src/test/java/io/cdap/plugin/gcp/gcs/StorageClientTest.java index ec94381684..8c4911ad5c 100644 --- a/src/test/java/io/cdap/plugin/gcp/gcs/StorageClientTest.java +++ b/src/test/java/io/cdap/plugin/gcp/gcs/StorageClientTest.java @@ -20,16 +20,19 @@ import com.google.cloud.storage.BucketInfo; import com.google.cloud.storage.Storage; import com.google.cloud.storage.StorageException; +import io.cdap.plugin.gcp.gcs.actions.SourceDestConfig; import org.junit.After; import org.junit.Assert; import org.junit.Before; import org.junit.Test; import org.mockito.Mock; import org.mockito.MockitoAnnotations; -import org.slf4j.Logger; import java.io.ByteArrayOutputStream; import java.io.PrintStream; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.List; import static org.mockito.Mockito.any; import static org.mockito.Mockito.times; @@ -50,12 +53,20 @@ public class StorageClientTest { private final ByteArrayOutputStream outContent = new ByteArrayOutputStream(); private final PrintStream originalOut = System.out; + private final List blobPageNames = new ArrayList<>(); @Before public void setUp() { MockitoAnnotations.initMocks(this); storageClient = new StorageClient(storage); System.setOut(new PrintStream(outContent)); + // Setup blobPageNames + blobPageNames.add("mydir/test_web1/report.html"); + blobPageNames.add("mydir/test_web2/report.html"); + blobPageNames.add("mydir/test_web2/css/"); + blobPageNames.add("mydir/test_web2/css/foo.css"); + blobPageNames.add("mydir/test_mob1/report.html"); + blobPageNames.add("mydir/test_mob2/report.html"); } @After @@ -144,4 +155,33 @@ public void testCreateBucketIfNotExists() { } Assert.fail("Test for detecting bucket creation failure did not succeed. No exception caught"); } + + @Test + public void testGetWildcardPathPrefix() { + Assert.assertEquals("mydir/test_web", StorageClient.getWildcardPathPrefix( + GCSPath.from("gs://my-bucket/mydir/test_web*/"), SourceDestConfig.WILDCARD_REGEX)); + Assert.assertEquals("", StorageClient.getWildcardPathPrefix( + GCSPath.from("gs://my-bucket/*"), SourceDestConfig.WILDCARD_REGEX)); + } + + @Test + public void testFilterMatchedPaths() { + GCSPath sourcePath = GCSPath.from("gs://foobucket/mydir/test_web*/*"); + List filterMatchedPaths = StorageClient.getFilterMatchedPaths(sourcePath, blobPageNames, false); + filterMatchedPaths.sort(Comparator.comparing(GCSPath::getUri)); + Assert.assertEquals(2, filterMatchedPaths.size()); + Assert.assertEquals(GCSPath.from("gs://foobucket/mydir/test_web1/report.html"), filterMatchedPaths.get(0)); + Assert.assertEquals(GCSPath.from("gs://foobucket/mydir/test_web2/report.html"), filterMatchedPaths.get(1)); + } + + @Test + public void testFilterMatchedPathsWithRecursive() { + GCSPath sourcePath = GCSPath.from("gs://foobucket/mydir/test_web*/*"); + List filterMatchedPaths = StorageClient.getFilterMatchedPaths(sourcePath, blobPageNames, true); + Assert.assertEquals(3, filterMatchedPaths.size()); + filterMatchedPaths.sort(Comparator.comparing(GCSPath::getUri)); + Assert.assertEquals(GCSPath.from("gs://foobucket/mydir/test_web1/report.html"), filterMatchedPaths.get(0)); + Assert.assertEquals(GCSPath.from("gs://foobucket/mydir/test_web2/css/"), filterMatchedPaths.get(1)); + Assert.assertEquals(GCSPath.from("gs://foobucket/mydir/test_web2/report.html"), filterMatchedPaths.get(2)); + } }