Skip to content

Commit

Permalink
S3 Delivery Stream Bug Fix (#2)
Browse files Browse the repository at this point in the history
- Timer was firing without buffer delay and before messages were written to stream
- Added Stream Unit Tests
  • Loading branch information
ptimson authored Feb 14, 2017
1 parent fa9e3ba commit 60e7acc
Show file tree
Hide file tree
Showing 4 changed files with 184 additions and 24 deletions.
3 changes: 1 addition & 2 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
# Firehose Mock
[![Github All Releases](https://img.shields.io/github/downloads/ptimson/firehose-mock/total.svg?style=flat)](https://github.com/ptimson/firehose-mock/releases/)
An embedded Java mock for AWS Kinesis Firehose

## Getting Started
Expand Down Expand Up @@ -41,7 +40,7 @@ firehoseClient.putRecord(putRequest);
#### Create CreateDeliveryStream Request
```java
ExtendedS3DestinationConfiguration s3StreamConfig = AWSFirehoseUtil.createS3DeliveryStream()
.withS3BucketArn("arn:myBucketArn")
.withS3BucketArn("arn:aws:s3:::myBucket")
.withBufferIntervalSeconds(10)
.withBufferSizeMB(1024)
.withCompressionFormat(CompressionFormat.GZIP)
Expand Down
9 changes: 8 additions & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

<groupId>io.timson</groupId>
<artifactId>firehose-mock</artifactId>
<version>0.0.1</version>
<version>0.0.2</version>

<name>Firehose Mock</name>
<url>https://github.com/ptimson/firehose-mock</url>
Expand Down Expand Up @@ -48,6 +48,7 @@
<hamcrest.all.version>1.3</hamcrest.all.version>
<jackson.databind.version>2.8.4</jackson.databind.version>
<java.version>1.8</java.version>
<jcabi.matchers.version>1.3</jcabi.matchers.version>
<jetty.server.version>9.4.0.v20161208</jetty.server.version>
<jetty.servlet.version>9.4.0.v20161208</jetty.servlet.version>
<junit.version>4.12</junit.version>
Expand Down Expand Up @@ -115,6 +116,12 @@
<version>${mockito.all.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.jcabi</groupId>
<artifactId>jcabi-matchers</artifactId>
<version>${jcabi.matchers.version}</version>
<scope>test</scope>
</dependency>
</dependencies>

<build>
Expand Down
53 changes: 33 additions & 20 deletions src/main/java/io/timson/firehose/stream/S3DeliveryStream.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,26 +31,29 @@ public class S3DeliveryStream implements DeliveryStream {
private final S3Client s3Client;
private final String s3Bucket;
private final String s3Prefix;
private final Integer bufferIntervalMs;
private final int bufferFlushSizeMb;
private final long bufferFlushSizeBytes;
private final Long bufferIntervalMs;
private final Long bufferFlushSizeBytes;
private final CompressionFormat compressionFormat;

private StringBuilder buffer = new StringBuilder();
private long bufferSize = 0;
private Timer flushTimer;
private TimerTask flushTimerTask;

private S3DeliveryStream(String name, S3Client s3Client, String s3BucketArn, String s3Prefix, Integer bufferIntervalSeconds, Integer bufferSizeMb, CompressionFormat compressionFormat) {
private S3DeliveryStream(String name,
S3Client s3Client,
String s3BucketArn,
String s3Prefix,
Long bufferIntervalMs,
Long bufferSizeBytes,
CompressionFormat compressionFormat) {
this.name = name;
this.s3Client = s3Client;
this.s3Bucket = extractBucketName(s3BucketArn);
this.s3Prefix = s3Prefix;
this.bufferIntervalMs = bufferIntervalSeconds * 1000;
this.bufferFlushSizeMb = bufferSizeMb;
this.bufferFlushSizeBytes = bufferSizeMb * MEGABYTE;
this.bufferIntervalMs = bufferIntervalMs;
this.bufferFlushSizeBytes = bufferSizeBytes;
this.compressionFormat = compressionFormat;
startFlushTimer();
}

private String extractBucketName(String s3BucketArn) {
Expand All @@ -68,7 +71,7 @@ public void run() {
}
};
flushTimer = new Timer();
flushTimer.schedule(flushTimerTask, 0, bufferIntervalMs);
flushTimer.schedule(flushTimerTask, bufferIntervalMs);
}

private void stopFlushTimer() {
Expand All @@ -77,19 +80,17 @@ private void stopFlushTimer() {

@Override
public synchronized void write(String data) {
if (bufferSize == 0) {
startFlushTimer();
}
buffer.append(data);
bufferSize += data.getBytes(UTF_8).length;
if (bufferSize >= bufferFlushSizeBytes) {
resetFlushTimer();
stopFlushTimer();
flush();
}
}

private void resetFlushTimer() {
stopFlushTimer();
startFlushTimer();
}

@Override
public void stop() {
stopFlushTimer();
Expand Down Expand Up @@ -140,8 +141,8 @@ public static class S3DeliveryStreamBuilder {
private S3Client s3Client;
private String s3BucketArn;
private String s3Prefix = "";
private Integer bufferIntervalSeconds = 300;
private Integer bufferSizeMb = 5;
private Long bufferIntervalMs = 300 * 1000L; // 300 s
private Long bufferSizeBytes = 5 * MEGABYTE; // 5 MiB
private CompressionFormat compressionFormat = UNCOMPRESSED;

public S3DeliveryStreamBuilder withName(String name) {
Expand All @@ -167,13 +168,25 @@ public S3DeliveryStreamBuilder withS3Prefix(String s3Prefix) {

public S3DeliveryStreamBuilder withBufferIntervalSeconds(Integer bufferIntervalSeconds) {
if (bufferIntervalSeconds == null) return this;
this.bufferIntervalSeconds = bufferIntervalSeconds;
this.bufferIntervalMs = bufferIntervalSeconds * 1000L;
return this;
}

public S3DeliveryStreamBuilder withBufferIntervalMilliseconds(Long bufferIntervalMilliseconds) {
if (bufferIntervalMilliseconds == null) return this;
this.bufferIntervalMs = bufferIntervalMilliseconds;
return this;
}

public S3DeliveryStreamBuilder withBufferSizeMB(Integer bufferSizeMb) {
if (bufferSizeMb == null) return this;
this.bufferSizeMb = bufferSizeMb;
this.bufferSizeBytes = bufferSizeMb * MEGABYTE;
return this;
}

public S3DeliveryStreamBuilder withBufferSizeBytes(Long bufferSizeBytes) {
if (bufferSizeBytes == null) return this;
this.bufferSizeBytes = bufferSizeBytes;
return this;
}

Expand All @@ -186,7 +199,7 @@ public S3DeliveryStreamBuilder withCompressionFormat(CompressionFormat compressi
public S3DeliveryStream build() {
if (isEmpty(name)) throw new IllegalArgumentException("Delivery stream name cannot be empty");
if (isEmpty(s3BucketArn)) throw new IllegalArgumentException("S3 Bucket ARN cannot be empty");
return new S3DeliveryStream(name, s3Client, s3BucketArn, s3Prefix, bufferIntervalSeconds, bufferSizeMb, compressionFormat);
return new S3DeliveryStream(name, s3Client, s3BucketArn, s3Prefix, bufferIntervalMs, bufferSizeBytes, compressionFormat);
}

}
Expand Down
143 changes: 142 additions & 1 deletion src/test/java/io/timson/firehose/stream/S3DeliveryStreamTest.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,147 @@
package io.timson.firehose.stream;

// TODO
import com.amazonaws.services.kinesisfirehose.model.CompressionFormat;
import io.timson.firehose.aws.S3Client;
import io.timson.firehose.stream.S3DeliveryStream.S3DeliveryStreamBuilder;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.ArgumentCaptor;
import org.mockito.Captor;
import org.mockito.Mock;
import org.mockito.runners.MockitoJUnitRunner;

import static com.jcabi.matchers.RegexMatchers.matchesPattern;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.mockito.Matchers.anyString;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.only;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;

@RunWith(MockitoJUnitRunner.class)
public class S3DeliveryStreamTest {

private static final String STREAM_NAME = "myStream";
private static final String S3_BUCKET = "myBucketArn";
private static final String S3_PREFIX = "myPrefix/";
private static final String MESSAGE_1 = "m1";
private static final String MESSAGE_2 = "m2";
private static final String LONG_MESSAGE = "LongMessage!";

@Mock
private S3Client s3Client;

@Captor
private ArgumentCaptor<String> s3Path;

private S3DeliveryStream stream;

@Before
public void setUp() throws Exception {
stream = new S3DeliveryStreamBuilder()
.withName(STREAM_NAME)
.withS3Client(s3Client)
.withS3BucketArn("arn:aws:s3:::" + S3_BUCKET)
.withS3Prefix(S3_PREFIX)
.withCompressionFormat(CompressionFormat.UNCOMPRESSED)
.withBufferSizeBytes(10L)
.withBufferIntervalMilliseconds(50L)
.build();
}

@Test
public void shouldNotStartFlushTimer_WhenNoMessagesHaveBeenSent() throws Exception {
Thread.sleep(190);
stream.write(MESSAGE_1);
stream.write(MESSAGE_2);

verifyNoMoreInteractions(s3Client);
}

@Test
public void shouldNotFlush_WhenFlushTimerNotExpired() throws Exception {
Thread.sleep(40);
stream.write(MESSAGE_1);
Thread.sleep(45);
stream.write(MESSAGE_2);

verifyNoMoreInteractions(s3Client);
}

@Test
public void shouldFlushOnce_WhenFlushTimerExpired() throws Exception {
stream.write(MESSAGE_1);
stream.write(MESSAGE_2);
stream.write(MESSAGE_2);
Thread.sleep(55);
Thread.sleep(55);

verify(s3Client, only()).createObject(eq(S3_BUCKET), anyString(), eq("m1m2m2"));
}

@Test
public void shouldFlushOnce_WhenReachesBufferSize() throws Exception {
stream.write(MESSAGE_1);
verifyNoMoreInteractions(s3Client);

stream.write(LONG_MESSAGE);
verify(s3Client, only()).createObject(eq(S3_BUCKET), anyString(), eq("m1LongMessage!"));
}

@Test(expected = IllegalArgumentException.class)
public void shouldThrowIllegalArgumentException_WhenInvalidS3Arn() throws Exception {
new S3DeliveryStreamBuilder().withS3BucketArn("Invalid").build();
}

@Test
public void shouldCreateS3ObjectWithValidPrefix() throws Exception {
stream.write(LONG_MESSAGE);

final String regex = "myPrefix\\/" + "[0-9]{4}\\/(?:[0-9]{2}\\/){3}" + STREAM_NAME
+ "-1-[0-9]{4}-(?:[0-9]{2}-){5}" + "[0-9a-f\\-]{36}";

verify(s3Client, only()).createObject(eq(S3_BUCKET), s3Path.capture(), eq(LONG_MESSAGE));
assertThat(s3Path.getValue(), matchesPattern(regex));
}

@Test
public void shouldCreateGzipS3Object_WhenEncryptionTypeIsGzip() throws Exception {
stream = new S3DeliveryStreamBuilder().withName(STREAM_NAME)
.withS3Client(s3Client)
.withS3BucketArn("arn:aws:s3:::" + S3_BUCKET)
.withBufferSizeBytes(1L)
.withCompressionFormat(CompressionFormat.GZIP)
.build();
stream.write(MESSAGE_1);

verify(s3Client, only()).createGzipObject(eq(S3_BUCKET), anyString(), eq(MESSAGE_1));
}

@Test
public void shouldCreateZipS3Object_WhenEncryptionTypeIsZip() throws Exception {
stream = new S3DeliveryStreamBuilder().withName(STREAM_NAME)
.withS3Client(s3Client)
.withS3BucketArn("arn:aws:s3:::" + S3_BUCKET)
.withBufferSizeBytes(1L)
.withCompressionFormat(CompressionFormat.ZIP)
.build();
stream.write(MESSAGE_1);

verify(s3Client, only()).createZipObject(eq(S3_BUCKET), anyString(), eq(MESSAGE_1));
}

@Test
public void shouldCreateSnappyS3Object_WhenEncryptionTypeIsSnappy() throws Exception {
stream = new S3DeliveryStreamBuilder().withName(STREAM_NAME)
.withS3Client(s3Client)
.withS3BucketArn("arn:aws:s3:::" + S3_BUCKET)
.withBufferSizeBytes(1L)
.withCompressionFormat(CompressionFormat.Snappy)
.build();
stream.write(MESSAGE_1);

verify(s3Client, only()).createSnappyObject(eq(S3_BUCKET), anyString(), eq(MESSAGE_1));
}

}

0 comments on commit 60e7acc

Please sign in to comment.