Skip to content

Commit

Permalink
Finish s3 bucket reader
Browse files Browse the repository at this point in the history
  • Loading branch information
mccullen committed Aug 6, 2021
1 parent 815a914 commit f9ab00a
Show file tree
Hide file tree
Showing 5 changed files with 56 additions and 53 deletions.
16 changes: 16 additions & 0 deletions reference/piper-files/s3-bucket-reader.piper
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
// Custom reader
reader icapa.cr.S3BucketReader Bucket=analysis Test=true
load ./resources/org/apache/ctakes/clinical/pipeline/DefaultFastPipeline.piper
//add icapa.cc.S3FileOntologyWriter KeepAll=true Key=ontology Bucket=analysis
add FileTreeXmiWriter OutputDirectory="./out/"

// RUN COMMAND: bin/runPiperFile.bat -p C:\root\vdt\icapa\nlp\custom-components-repos\custom-components\reference\piper-files\s3.piper --key <key>

// localstack start
// aws configure (use any fake credentials and I used us-east-1 region)
// aws --endpoint-url=http://localhost:4566 s3 mb s3://analysis
// aws --endpoint-url=http://localhost:4566 s3 cp C:/root/vdt/icapa/nlp/test-data/notes/stress-test.txt s3://analysis
// aws --endpoint-url=http://localhost:4566 s3 cp C:/root/vdt/icapa/nlp/test-data/notes/stress-test-2.txt s3://analysis
// Run the piper file...
// aws --endpoint-url=http://localhost:4566 s3 cp s3://analysis C:/root/vdt/icapa/nlp/test-data/ --recursive

2 changes: 1 addition & 1 deletion src/main/java/icapa/Util.java
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ public static AmazonS3 getS3Client() {
AmazonS3ClientBuilder builder = AmazonS3ClientBuilder.standard()
.withEndpointConfiguration(new AwsClientBuilder.EndpointConfiguration("http://localhost:4566", "us-east-1"))
.withCredentials(new AWSStaticCredentialsProvider(new BasicAWSCredentials("test", "test")));
//builder.setPathStyleAccessEnabled(true); // Not sure if you need this?
builder.setPathStyleAccessEnabled(true); // Need this to get it to find localhost for some reason...
AmazonS3 s3Client = builder.build();
return s3Client;
}
Expand Down
66 changes: 32 additions & 34 deletions src/main/java/icapa/cr/S3BucketReader.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,10 @@
import com.amazonaws.SdkClientException;
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.AmazonS3ClientBuilder;
import com.amazonaws.services.s3.iterable.S3Objects;
import com.amazonaws.services.s3.model.*;
import icapa.Util;
import org.apache.ctakes.typesystem.type.structured.DocumentID;
import org.apache.log4j.Logger;
import org.apache.uima.UimaContext;
import org.apache.uima.collection.CollectionException;
Expand All @@ -28,44 +31,47 @@ public class S3BucketReader extends JCasCollectionReader_ImplBase {
@ConfigurationParameter(
name = PARAM_BUCKET,
description = "Bucket from which to read documents",
mandatory = false,
defaultValue = "0"
mandatory = true,
defaultValue = ""
)
private String _bucket;

/*
static public final String PARAM_SERVICE_ENDPOINT = "ServiceEndpoint";
static public final String PARAM_PREFIX = "Prefix";
@ConfigurationParameter(
name = PARAM_SERVICE_ENDPOINT,
description = "Service endpoint where bucket resides",
name = PARAM_PREFIX,
description = "Prefix to use for getting notes from the given bucket",
mandatory = false,
defaultValue = "0"
defaultValue = ""
)
private int _serviceEndpoint;
private String _prefix;

static public final String PARAM_SIGNING_REGION = "SigningRegion";
static public final String PARAM_TEST = "Test";
@ConfigurationParameter(
name = PARAM_BUCKET,
description = "Region where bucket is located",
name = PARAM_TEST,
description = "Set to true if testing on localstack for testing",
mandatory = false,
defaultValue = "0"
defaultValue = "false"
)
private int _signingRegion;
*/
private boolean _test;

private AmazonS3 _s3Client;
private ListObjectsV2Result _result;
private ListObjectsV2Request _req;
private Iterator<S3ObjectSummary> _objectSummaries;

@Override
public void initialize(UimaContext context) throws ResourceInitializationException {
super.initialize(context);
_s3Client = AmazonS3ClientBuilder.defaultClient();

// maxKeys is set to 1000 to demonstrate the use of
// ListObjectsV2Result.getNextContinuationToken()
_req = new ListObjectsV2Request().withBucketName(_bucket).withMaxKeys(1000);
if (_test) {
_s3Client = Util.getS3Client();
} else {
_s3Client = AmazonS3ClientBuilder.defaultClient();
}
S3Objects s3Objects;
if (_prefix != null && !_prefix.equals("")) {
s3Objects = S3Objects.withPrefix(_s3Client, _bucket, _prefix);
} else {
s3Objects = S3Objects.inBucket(_s3Client, _bucket);
}
_objectSummaries = s3Objects.iterator();
}

@Override
Expand All @@ -78,23 +84,15 @@ public void getNext(JCas jCas) {
BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(inputStream));
String documentText = bufferedReader.lines().collect(Collectors.joining("\n"));
jCas.setDocumentText(documentText);
DocumentID documentID = new DocumentID(jCas);
String id = objectSummary.getKey();
documentID.setDocumentID(id);
documentID.addToIndexes();
}

@Override
public boolean hasNext() {
if (_objectSummaries == null || !_objectSummaries.hasNext()) {
// No more objects on this page, so get next page

_result = _s3Client.listObjectsV2(_req);
_objectSummaries = _result.getObjectSummaries().iterator();

// If there are more than maxKeys keys in the bucket, get a continuation token
// and list the next objects.
String token = _result.getNextContinuationToken();
System.out.println("Next Continuation Token: " + token);
_req.setContinuationToken(token);
}
return (_objectSummaries != null && _objectSummaries.hasNext()) || _result.isTruncated();
return _objectSummaries.hasNext();
}

@Override
Expand Down
9 changes: 5 additions & 4 deletions src/main/java/icapa/services/DelimiterReaderService.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,9 @@
import java.util.Map;

public class DelimiterReaderService implements CollectionReader {
public static String[] currentLine;
static private final Logger LOGGER = Logger.getLogger(DelimiterReaderService.class.getName());

public static String[] currentLine;
private Reader _reader;
private int _rowStart = 0;
private int _rowEnd = 0;
Expand Down Expand Up @@ -53,7 +54,7 @@ public static DelimiterReaderService from(DelimiterReaderParams params) {
result._documentIdIndex = result._headerToIndex.get(params.getDocumentIdColumnName());
result._csvReader.skip(result._rowStart);
} catch (Exception e) {
e.printStackTrace();
LOGGER.error("Error reading next header line", e);
}
return result;
}
Expand Down Expand Up @@ -86,7 +87,7 @@ public void readNext(JCas jCas) {
jCas.setDocumentText("");
}
} catch (Exception e) {
e.printStackTrace();
LOGGER.error("Error reading next delimiter line", e);
}
++_currentRow;
}
Expand All @@ -112,7 +113,7 @@ public void destroy() {
try {
_csvReader.close();
} catch (IOException e) {
e.printStackTrace();
LOGGER.error("Error destroying delimiter reader", e);
}
}
}
16 changes: 2 additions & 14 deletions src/test/java/PlaygroundTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@ public void testPipeline() throws Exception {
PipelineBuilder pipelineBuilder = piperFileReader.getBuilder();
String umlsKey = getProperty("umls.key");
pipelineBuilder.set(UmlsUserApprover.KEY_PARAM, umlsKey);
pipelineBuilder.set(ConfigParameterConstants.PARAM_LOOKUP_XML, "org/apache/ctakes/dictionary/lookup/fast/icd.xml");
//pipelineBuilder.set(ConfigParameterConstants.PARAM_LOOKUP_XML, "org/apache/ctakes/dictionary/lookup/fast/icd.xml");
//piperFileReader.loadPipelineFile("./reference/piper-files/stress-test.piper");
piperFileReader.loadPipelineFile("./reference/piper-files/disorder-ae.piper");
piperFileReader.loadPipelineFile("./reference/piper-files/s3-bucket-reader.piper");
pipelineBuilder.run();
}

Expand All @@ -38,16 +38,4 @@ public static final String getProperty(String key) {
}
return property;
}

@Test
public void testRuta() throws Exception {
AnalysisEngine rutaEngine = AnalysisEngineFactory.createEngine(
RutaEngine.class, //
RutaEngine.PARAM_RESOURCE_PATHS,
"src/main/resources/ruta/resources",//
RutaEngine.PARAM_SCRIPT_PATHS,
"src/main/resources/ruta/scripts",
RutaEngine.PARAM_MAIN_SCRIPT, "Example");
}

}

0 comments on commit f9ab00a

Please sign in to comment.