From f9ab00aedc0f748eef11720c12bda59111153a4e Mon Sep 17 00:00:00 2001 From: mccullen Date: Fri, 6 Aug 2021 11:31:15 -0400 Subject: [PATCH] Finish s3 bucket reader --- reference/piper-files/s3-bucket-reader.piper | 16 +++++ src/main/java/icapa/Util.java | 2 +- src/main/java/icapa/cr/S3BucketReader.java | 66 +++++++++---------- .../services/DelimiterReaderService.java | 9 +-- src/test/java/PlaygroundTest.java | 16 +---- 5 files changed, 56 insertions(+), 53 deletions(-) create mode 100644 reference/piper-files/s3-bucket-reader.piper diff --git a/reference/piper-files/s3-bucket-reader.piper b/reference/piper-files/s3-bucket-reader.piper new file mode 100644 index 0000000..5b41a72 --- /dev/null +++ b/reference/piper-files/s3-bucket-reader.piper @@ -0,0 +1,16 @@ +// Custom reader +reader icapa.cr.S3BucketReader Bucket=analysis Test=true +load ./resources/org/apache/ctakes/clinical/pipeline/DefaultFastPipeline.piper +//add icapa.cc.S3FileOntologyWriter KeepAll=true Key=ontology Bucket=analysis +add FileTreeXmiWriter OutputDirectory="./out/" + +// RUN COMMAND: bin/runPiperFile.bat -p C:\root\vdt\icapa\nlp\custom-components-repos\custom-components\reference\piper-files\s3.piper --key + +// localstack start +// aws configure (use any fake credentials and I used us-east-1 region) +// aws --endpoint-url=http://localhost:4566 s3 mb s3://analysis +// aws --endpoint-url=http://localhost:4566 s3 cp C:/root/vdt/icapa/nlp/test-data/notes/stress-test.txt s3://analysis +// aws --endpoint-url=http://localhost:4566 s3 cp C:/root/vdt/icapa/nlp/test-data/notes/stress-test-2.txt s3://analysis +// Run the piper file... +// aws --endpoint-url=http://localhost:4566 s3 cp s3://analysis C:/root/vdt/icapa/nlp/test-data/ --recursive + diff --git a/src/main/java/icapa/Util.java b/src/main/java/icapa/Util.java index 6bbc4d9..6ebb971 100644 --- a/src/main/java/icapa/Util.java +++ b/src/main/java/icapa/Util.java @@ -149,7 +149,7 @@ public static AmazonS3 getS3Client() { AmazonS3ClientBuilder builder = AmazonS3ClientBuilder.standard() .withEndpointConfiguration(new AwsClientBuilder.EndpointConfiguration("http://localhost:4566", "us-east-1")) .withCredentials(new AWSStaticCredentialsProvider(new BasicAWSCredentials("test", "test"))); - //builder.setPathStyleAccessEnabled(true); // Not sure if you need this? + builder.setPathStyleAccessEnabled(true); // Need this to get it to find localhost for some reason... AmazonS3 s3Client = builder.build(); return s3Client; } diff --git a/src/main/java/icapa/cr/S3BucketReader.java b/src/main/java/icapa/cr/S3BucketReader.java index 4becfb6..104e377 100644 --- a/src/main/java/icapa/cr/S3BucketReader.java +++ b/src/main/java/icapa/cr/S3BucketReader.java @@ -4,7 +4,10 @@ import com.amazonaws.SdkClientException; import com.amazonaws.services.s3.AmazonS3; import com.amazonaws.services.s3.AmazonS3ClientBuilder; +import com.amazonaws.services.s3.iterable.S3Objects; import com.amazonaws.services.s3.model.*; +import icapa.Util; +import org.apache.ctakes.typesystem.type.structured.DocumentID; import org.apache.log4j.Logger; import org.apache.uima.UimaContext; import org.apache.uima.collection.CollectionException; @@ -28,44 +31,47 @@ public class S3BucketReader extends JCasCollectionReader_ImplBase { @ConfigurationParameter( name = PARAM_BUCKET, description = "Bucket from which to read documents", - mandatory = false, - defaultValue = "0" + mandatory = true, + defaultValue = "" ) private String _bucket; - /* - static public final String PARAM_SERVICE_ENDPOINT = "ServiceEndpoint"; + static public final String PARAM_PREFIX = "Prefix"; @ConfigurationParameter( - name = PARAM_SERVICE_ENDPOINT, - description = "Service endpoint where bucket resides", + name = PARAM_PREFIX, + description = "Prefix to use for getting notes from the given bucket", mandatory = false, - defaultValue = "0" + defaultValue = "" ) - private int _serviceEndpoint; + private String _prefix; - static public final String PARAM_SIGNING_REGION = "SigningRegion"; + static public final String PARAM_TEST = "Test"; @ConfigurationParameter( - name = PARAM_BUCKET, - description = "Region where bucket is located", + name = PARAM_TEST, + description = "Set to true if testing on localstack for testing", mandatory = false, - defaultValue = "0" + defaultValue = "false" ) - private int _signingRegion; - */ + private boolean _test; private AmazonS3 _s3Client; - private ListObjectsV2Result _result; - private ListObjectsV2Request _req; private Iterator _objectSummaries; @Override public void initialize(UimaContext context) throws ResourceInitializationException { super.initialize(context); - _s3Client = AmazonS3ClientBuilder.defaultClient(); - - // maxKeys is set to 1000 to demonstrate the use of - // ListObjectsV2Result.getNextContinuationToken() - _req = new ListObjectsV2Request().withBucketName(_bucket).withMaxKeys(1000); + if (_test) { + _s3Client = Util.getS3Client(); + } else { + _s3Client = AmazonS3ClientBuilder.defaultClient(); + } + S3Objects s3Objects; + if (_prefix != null && !_prefix.equals("")) { + s3Objects = S3Objects.withPrefix(_s3Client, _bucket, _prefix); + } else { + s3Objects = S3Objects.inBucket(_s3Client, _bucket); + } + _objectSummaries = s3Objects.iterator(); } @Override @@ -78,23 +84,15 @@ public void getNext(JCas jCas) { BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(inputStream)); String documentText = bufferedReader.lines().collect(Collectors.joining("\n")); jCas.setDocumentText(documentText); + DocumentID documentID = new DocumentID(jCas); + String id = objectSummary.getKey(); + documentID.setDocumentID(id); + documentID.addToIndexes(); } @Override public boolean hasNext() { - if (_objectSummaries == null || !_objectSummaries.hasNext()) { - // No more objects on this page, so get next page - - _result = _s3Client.listObjectsV2(_req); - _objectSummaries = _result.getObjectSummaries().iterator(); - - // If there are more than maxKeys keys in the bucket, get a continuation token - // and list the next objects. - String token = _result.getNextContinuationToken(); - System.out.println("Next Continuation Token: " + token); - _req.setContinuationToken(token); - } - return (_objectSummaries != null && _objectSummaries.hasNext()) || _result.isTruncated(); + return _objectSummaries.hasNext(); } @Override diff --git a/src/main/java/icapa/services/DelimiterReaderService.java b/src/main/java/icapa/services/DelimiterReaderService.java index 8998425..4d5388a 100644 --- a/src/main/java/icapa/services/DelimiterReaderService.java +++ b/src/main/java/icapa/services/DelimiterReaderService.java @@ -17,8 +17,9 @@ import java.util.Map; public class DelimiterReaderService implements CollectionReader { - public static String[] currentLine; static private final Logger LOGGER = Logger.getLogger(DelimiterReaderService.class.getName()); + + public static String[] currentLine; private Reader _reader; private int _rowStart = 0; private int _rowEnd = 0; @@ -53,7 +54,7 @@ public static DelimiterReaderService from(DelimiterReaderParams params) { result._documentIdIndex = result._headerToIndex.get(params.getDocumentIdColumnName()); result._csvReader.skip(result._rowStart); } catch (Exception e) { - e.printStackTrace(); + LOGGER.error("Error reading next header line", e); } return result; } @@ -86,7 +87,7 @@ public void readNext(JCas jCas) { jCas.setDocumentText(""); } } catch (Exception e) { - e.printStackTrace(); + LOGGER.error("Error reading next delimiter line", e); } ++_currentRow; } @@ -112,7 +113,7 @@ public void destroy() { try { _csvReader.close(); } catch (IOException e) { - e.printStackTrace(); + LOGGER.error("Error destroying delimiter reader", e); } } } diff --git a/src/test/java/PlaygroundTest.java b/src/test/java/PlaygroundTest.java index 2c086c9..418584b 100644 --- a/src/test/java/PlaygroundTest.java +++ b/src/test/java/PlaygroundTest.java @@ -20,9 +20,9 @@ public void testPipeline() throws Exception { PipelineBuilder pipelineBuilder = piperFileReader.getBuilder(); String umlsKey = getProperty("umls.key"); pipelineBuilder.set(UmlsUserApprover.KEY_PARAM, umlsKey); - pipelineBuilder.set(ConfigParameterConstants.PARAM_LOOKUP_XML, "org/apache/ctakes/dictionary/lookup/fast/icd.xml"); + //pipelineBuilder.set(ConfigParameterConstants.PARAM_LOOKUP_XML, "org/apache/ctakes/dictionary/lookup/fast/icd.xml"); //piperFileReader.loadPipelineFile("./reference/piper-files/stress-test.piper"); - piperFileReader.loadPipelineFile("./reference/piper-files/disorder-ae.piper"); + piperFileReader.loadPipelineFile("./reference/piper-files/s3-bucket-reader.piper"); pipelineBuilder.run(); } @@ -38,16 +38,4 @@ public static final String getProperty(String key) { } return property; } - - @Test - public void testRuta() throws Exception { - AnalysisEngine rutaEngine = AnalysisEngineFactory.createEngine( - RutaEngine.class, // - RutaEngine.PARAM_RESOURCE_PATHS, - "src/main/resources/ruta/resources",// - RutaEngine.PARAM_SCRIPT_PATHS, - "src/main/resources/ruta/scripts", - RutaEngine.PARAM_MAIN_SCRIPT, "Example"); - } - }