Skip to content

Commit

Permalink
Merge pull request #1418 from cloudsufi/feat/cp-wildcard
Browse files Browse the repository at this point in the history
[PLUGIN-698] Add wildcard support for copy and move action
  • Loading branch information
psainics authored Jun 12, 2024
2 parents 8d2cbc9 + a60783b commit de01bf2
Show file tree
Hide file tree
Showing 7 changed files with 120 additions and 7 deletions.
1 change: 1 addition & 0 deletions docs/GCSCopy-action.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ Properties
It can be found on the Dashboard in the Google Cloud Platform Console.

**Source Path**: Path to a source object or directory.
> Use `*` to copy multiple files. For example, `gs://demo0/prod/reports/*.csv` will copy all CSV files in the `reports` directory.
**Destination Path**: Path to the destination. The bucket will be created if it does not exist.

Expand Down
1 change: 1 addition & 0 deletions docs/GCSMove-action.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ Properties
It can be found on the Dashboard in the Google Cloud Platform Console.

**Source Path**: Path to a source object or directory.
> Use `*` to move multiple files. For example, `gs://demo0/prod/reports/*.csv` will move all CSV files in the `reports` directory.
**Destination Path**: Path to the destination. The bucket will be created if it does not exist.

Expand Down
44 changes: 43 additions & 1 deletion src/main/java/io/cdap/plugin/gcp/gcs/StorageClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,17 @@
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.nio.file.FileSystems;
import java.nio.file.PathMatcher;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Consumer;
import java.util.regex.Pattern;
import javax.annotation.Nullable;

/**
Expand Down Expand Up @@ -111,7 +117,7 @@ public void mapMetaDataForAllBlobs(String path, Consumer<Map<String, String>> fu
/**
* Creates the given bucket if it does not exists.
*
* @param path the path of the bucket
* @param path the path of the bucket
* @param location the location of bucket
* @param cmekKeyName the name of the cmek key
*/
Expand Down Expand Up @@ -163,6 +169,42 @@ public void move(GCSPath sourcePath, GCSPath destPath, boolean recursive, boolea
pairTraverse(sourcePath, destPath, recursive, overwrite, BlobPair::move);
}

/**
* Get all the matching wildcard paths given the regex input.
*/
public List<GCSPath> getMatchedPaths(GCSPath sourcePath, boolean recursive, Pattern wildcardRegex) {
Page<Blob> blobPage = storage.list(sourcePath.getBucket(), Storage.BlobListOption.prefix(
getWildcardPathPrefix(sourcePath, wildcardRegex)
));
List<String> blobPageNames = new ArrayList<>();
blobPage.getValues().forEach(blob -> blobPageNames.add(blob.getName()));
return getFilterMatchedPaths(sourcePath, blobPageNames, recursive);
}

static String getWildcardPathPrefix(GCSPath sourcePath, Pattern wildcardRegex) {
String pattern = sourcePath.getName();
String[] patternSplits = pattern.split(wildcardRegex.pattern());
// prefix may be empty
return patternSplits.length >= 1 ? patternSplits[0] : "";
}

static List<GCSPath> getFilterMatchedPaths(GCSPath sourcePath, List<String> blobPageNames, boolean recursive) {
Set<GCSPath> matchedPaths = new HashSet<>();
String globPattern = "glob:" + sourcePath.getName();
PathMatcher matcher = FileSystems.getDefault().getPathMatcher(globPattern);
for (String blobName : blobPageNames) {
if (matcher.matches(Paths.get(blobName))) {
LOG.debug("Blob name {} matches the glob pattern {}", blobName, globPattern);
String gcsPath = String.format("gs://%s/%s", sourcePath.getBucket(), blobName);
matchedPaths.add(GCSPath.from(gcsPath));
}
}
if (!recursive) {
matchedPaths.removeIf(path -> path.getName().endsWith("/"));
}
return new ArrayList<>(matchedPaths);
}

/**
* Gets source and destination pairs by traversing the source path. Consumes each pair after the directory structure
* is completely traversed.
Expand Down
16 changes: 13 additions & 3 deletions src/main/java/io/cdap/plugin/gcp/gcs/actions/GCSCopy.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,9 @@
import io.cdap.plugin.gcp.gcs.StorageClient;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import javax.annotation.Nullable;


Expand Down Expand Up @@ -76,9 +78,17 @@ public void run(ActionContext context) throws IOException {
// create the destination bucket if not exist
storageClient.createBucketIfNotExists(destPath, config.location, cmekKeyName);

//noinspection ConstantConditions
storageClient.copy(config.getSourcePath(), config.getDestPath(), config.recursive, config.shouldOverwrite());

List<GCSPath> matchedPaths = new ArrayList<>();
if (SourceDestConfig.WILDCARD_REGEX.matcher(config.getSourcePath().getName()).find()) {
matchedPaths = storageClient.getMatchedPaths(config.getSourcePath(), config.recursive,
SourceDestConfig.WILDCARD_REGEX);
} else {
matchedPaths.add(config.getSourcePath());
}
for (GCSPath sourcePath : matchedPaths) {
//noinspection ConstantConditions
storageClient.copy(sourcePath, config.getDestPath(), config.recursive, config.shouldOverwrite());
}
}

/**
Expand Down
15 changes: 13 additions & 2 deletions src/main/java/io/cdap/plugin/gcp/gcs/actions/GCSMove.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,9 @@
import io.cdap.plugin.gcp.gcs.StorageClient;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import javax.annotation.Nullable;


Expand Down Expand Up @@ -75,8 +77,17 @@ public void run(ActionContext context) throws IOException {
// create the destination bucket if not exist
storageClient.createBucketIfNotExists(destPath, config.location, cmekKeyName);

//noinspection ConstantConditions
storageClient.move(config.getSourcePath(), config.getDestPath(), config.recursive, config.shouldOverwrite());
List<GCSPath> matchedPaths = new ArrayList<>();
if (SourceDestConfig.WILDCARD_REGEX.matcher(config.getSourcePath().getName()).find()) {
matchedPaths = storageClient.getMatchedPaths(config.getSourcePath(), config.recursive,
SourceDestConfig.WILDCARD_REGEX);
} else {
matchedPaths.add(config.getSourcePath());
}
for (GCSPath sourcePath : matchedPaths) {
//noinspection ConstantConditions
storageClient.move(sourcePath, config.getDestPath(), config.recursive, config.shouldOverwrite());
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,9 @@
import io.cdap.plugin.gcp.gcs.GCSPath;

import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.regex.Pattern;
import javax.annotation.Nullable;

/**
Expand All @@ -44,6 +46,7 @@ public class SourceDestConfig extends GCPConfig {
public static final String NAME_DEST_PATH = "destPath";
public static final String NAME_LOCATION = "location";
public static final String READ_TIMEOUT = "readTimeout";
public static final Pattern WILDCARD_REGEX = Pattern.compile("[*]");

@Name(NAME_SOURCE_PATH)
@Macro
Expand Down Expand Up @@ -132,6 +135,11 @@ public void validate(FailureCollector collector, Map<String, String> arguments)
} catch (IllegalArgumentException e) {
collector.addFailure(e.getMessage(), null).withConfigProperty(NAME_DEST_PATH);
}
if (WILDCARD_REGEX.matcher(destPath).find()) {
collector.addFailure("Destination path should not contain wildcard characters.",
"Please remove the wildcard characters from the destination path.")
.withConfigProperty(NAME_DEST_PATH);
}
}
if (!containsMacro(NAME_CMEK_KEY)) {
validateCmekKey(collector, arguments);
Expand Down
42 changes: 41 additions & 1 deletion src/test/java/io/cdap/plugin/gcp/gcs/StorageClientTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,19 @@
import com.google.cloud.storage.BucketInfo;
import com.google.cloud.storage.Storage;
import com.google.cloud.storage.StorageException;
import io.cdap.plugin.gcp.gcs.actions.SourceDestConfig;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import org.slf4j.Logger;

import java.io.ByteArrayOutputStream;
import java.io.PrintStream;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;

import static org.mockito.Mockito.any;
import static org.mockito.Mockito.times;
Expand All @@ -50,12 +53,20 @@ public class StorageClientTest {
private final ByteArrayOutputStream outContent = new ByteArrayOutputStream();

private final PrintStream originalOut = System.out;
private final List<String> blobPageNames = new ArrayList<>();

@Before
public void setUp() {
MockitoAnnotations.initMocks(this);
storageClient = new StorageClient(storage);
System.setOut(new PrintStream(outContent));
// Setup blobPageNames
blobPageNames.add("mydir/test_web1/report.html");
blobPageNames.add("mydir/test_web2/report.html");
blobPageNames.add("mydir/test_web2/css/");
blobPageNames.add("mydir/test_web2/css/foo.css");
blobPageNames.add("mydir/test_mob1/report.html");
blobPageNames.add("mydir/test_mob2/report.html");
}

@After
Expand Down Expand Up @@ -144,4 +155,33 @@ public void testCreateBucketIfNotExists() {
}
Assert.fail("Test for detecting bucket creation failure did not succeed. No exception caught");
}

@Test
public void testGetWildcardPathPrefix() {
Assert.assertEquals("mydir/test_web", StorageClient.getWildcardPathPrefix(
GCSPath.from("gs://my-bucket/mydir/test_web*/"), SourceDestConfig.WILDCARD_REGEX));
Assert.assertEquals("", StorageClient.getWildcardPathPrefix(
GCSPath.from("gs://my-bucket/*"), SourceDestConfig.WILDCARD_REGEX));
}

@Test
public void testFilterMatchedPaths() {
GCSPath sourcePath = GCSPath.from("gs://foobucket/mydir/test_web*/*");
List<GCSPath> filterMatchedPaths = StorageClient.getFilterMatchedPaths(sourcePath, blobPageNames, false);
filterMatchedPaths.sort(Comparator.comparing(GCSPath::getUri));
Assert.assertEquals(2, filterMatchedPaths.size());
Assert.assertEquals(GCSPath.from("gs://foobucket/mydir/test_web1/report.html"), filterMatchedPaths.get(0));
Assert.assertEquals(GCSPath.from("gs://foobucket/mydir/test_web2/report.html"), filterMatchedPaths.get(1));
}

@Test
public void testFilterMatchedPathsWithRecursive() {
GCSPath sourcePath = GCSPath.from("gs://foobucket/mydir/test_web*/*");
List<GCSPath> filterMatchedPaths = StorageClient.getFilterMatchedPaths(sourcePath, blobPageNames, true);
Assert.assertEquals(3, filterMatchedPaths.size());
filterMatchedPaths.sort(Comparator.comparing(GCSPath::getUri));
Assert.assertEquals(GCSPath.from("gs://foobucket/mydir/test_web1/report.html"), filterMatchedPaths.get(0));
Assert.assertEquals(GCSPath.from("gs://foobucket/mydir/test_web2/css/"), filterMatchedPaths.get(1));
Assert.assertEquals(GCSPath.from("gs://foobucket/mydir/test_web2/report.html"), filterMatchedPaths.get(2));
}
}

0 comments on commit de01bf2

Please sign in to comment.