diff --git a/README.md b/README.md index fee65ca5..596c59fc 100644 --- a/README.md +++ b/README.md @@ -346,6 +346,24 @@ config.setAgentEndpoint("udp://127.0.0.1:1000"); AWS_EMF_AGENT_ENDPOINT="udp://127.0.0.1:1000" ``` +## Thread-safety + +### Internal Synchronization + +The MetricsLogger class is thread-safe. Specifically, the generalized multi-threading use cases for this library are: + +1. Collect some metrics or metadata on a single MetricsLogger; Pass the logger into one or more async contexts where new metrics or metadata can be added concurrently; Join the async contexts (e.g. Future.get()) and flush the metrics. +2. Collect some metrics or metadata on a single MetricsLogger; Pass the logger into an async context; Flush from the async context concurrently. + +Thread-safety for the first use case is achieved by introducing concurrent internal data structures and atomic operations associated with these models, to ensure the access to shared mutable resources are always synchronized. + +Thread-safety for the second use case is achieved by using a ReentrantReadWriteLock. This lock is used to create an internal sync context for flush() method in multi-threading situations. `flush()` acquires write lock, while other methods (which have access to mutable shared data with `flush()`) acquires read lock. This makes sure `flush()` is always executed exclusively, while other methods can be executed concurrently. + +### Use Cases that are Not Covered + +With all the internal synchronization measures, however, there're still certain multi-threading use cases that are not covered by this library, which might require external synchronizations or other protection measures. +This is due to the fact that the execution order of APIs are not determined in async contexts. For example, if user needs to associate a given set of properties with a metric in each thread, the results are not guaranteed since the execution order of `putProperty()` is not determined across threads. In such cases, we recommend using a different MetricsLogger instance for different threads, so that no resources are shared and no thread-safety problem would ever happen. Note that this can often be simplified by using a ThreadLocal variable. + ## Examples Check out the [examples](https://github.com/awslabs/aws-embedded-metrics-java/tree/master/examples) directory to get started. @@ -392,6 +410,14 @@ To auto fix code style, run ./gradlew :spotlessApply ``` +### Benchmark + +We use [JMH](https://github.com/openjdk/jmh) as our framework for concurrency performance benchmarking. Benchmarks can be run by: +``` +./gradlew jmh +``` +To run a single benchmark, consider using JMH plugins. For example, [JMH plugin for IntelliJ IDEA](https://github.com/artyushov/idea-jmh-plugin) + ## License This project is licensed under the Apache-2.0 License. diff --git a/build.gradle b/build.gradle index db0a2026..f259b381 100644 --- a/build.gradle +++ b/build.gradle @@ -18,17 +18,18 @@ plugins { id 'com.diffplug.spotless' version '5.8.2' id 'maven-publish' id 'signing' + id "me.champeau.jmh" version "0.6.6" } group "software.amazon.cloudwatchlogs" allprojects { compileJava { - sourceCompatibility = '1.8' - targetCompatibility = '1.8' + sourceCompatibility = JavaVersion.VERSION_11 + targetCompatibility = JavaVersion.VERSION_11 } - version = '3.0.0-beta-1' + version = '3.1.0' } java { @@ -78,6 +79,10 @@ dependencies { testImplementation 'software.amazon.awssdk:cloudwatch:2.13.54' testCompileOnly 'org.projectlombok:lombok:1.18.12' testAnnotationProcessor 'org.projectlombok:lombok:1.18.12' + + implementation 'org.openjdk.jmh:jmh-core:1.29' + implementation 'org.openjdk.jmh:jmh-generator-annprocess:1.29' + jmhAnnotationProcessor 'org.openjdk.jmh:jmh-generator-annprocess:1.29' } spotless { @@ -124,7 +129,7 @@ tasks.withType(JavaCompile) { } tasks.named('wrapper') { - gradleVersion = '6.5.1' + gradleVersion = '7.4.2' distributionType = Wrapper.DistributionType.ALL } diff --git a/buildspecs/buildspec.canary.yml b/buildspecs/buildspec.canary.yml deleted file mode 100644 index 35e33c5f..00000000 --- a/buildspecs/buildspec.canary.yml +++ /dev/null @@ -1,21 +0,0 @@ -version: 0.2 - -env: - variables: - AWS_REGION: us-west-2 - parameter-store: - AWS_ACCESS_KEY_ID: AccessKey - AWS_SECRET_ACCESS_KEY: SecretKey -phases: - install: - runtime-versions: - java: corretto11 - commands: - # start docker - # https://docs.aws.amazon.com/codebuild/latest/userguide/sample-docker-custom-image.html#sample-docker-custom-image-files - - nohup /usr/local/bin/dockerd --host=unix:///var/run/docker.sock --host=tcp://127.0.0.1:2375 --storage-driver=overlay2& - - timeout 15 sh -c "until docker info; do echo .; sleep 1; done" - build: - commands: - - ./gradlew build - - ./bin/deploy-canary.sh diff --git a/buildspecs/buildspec.release.yml b/buildspecs/buildspec.release.yml deleted file mode 100644 index 6ac0735d..00000000 --- a/buildspecs/buildspec.release.yml +++ /dev/null @@ -1,17 +0,0 @@ -version: 0.2 - -env: - variables: - AWS_REGION: us-west-2 - parameter-store: - ORG_GRADLE_PROJECT_signingKey: emf_java_signing_key - ORG_GRADLE_PROJECT_signingPassword: emf_java_signing_password - mavenUserName: mavenUserName - mavenPassword: mavenPassword -phases: - install: - runtime-versions: - java: corretto11 - build: - commands: - - ./gradlew publish diff --git a/buildspecs/buildspec.yml b/buildspecs/buildspec.yml deleted file mode 100644 index 31b5b135..00000000 --- a/buildspecs/buildspec.yml +++ /dev/null @@ -1,21 +0,0 @@ -version: 0.2 - -env: - variables: - AWS_REGION: us-west-2 - parameter-store: - AWS_ACCESS_KEY_ID: AccessKey - AWS_SECRET_ACCESS_KEY: SecretKey -phases: - install: - runtime-versions: - java: corretto11 - commands: - # start docker - # https://docs.aws.amazon.com/codebuild/latest/userguide/sample-docker-custom-image.html#sample-docker-custom-image-files - - nohup /usr/local/bin/dockerd --host=unix:///var/run/docker.sock --host=tcp://127.0.0.1:2375 --storage-driver=overlay2& - - timeout 15 sh -c "until docker info; do echo .; sleep 1; done" - build: - commands: - - ./gradlew build - - ./gradlew integ diff --git a/examples/lambda/src/main/java/Handler.java b/examples/lambda/src/main/java/Handler.java index 2c24d653..913dc06a 100644 --- a/examples/lambda/src/main/java/Handler.java +++ b/examples/lambda/src/main/java/Handler.java @@ -7,10 +7,10 @@ import java.util.HashMap; import java.util.Map; -public class Handler implements RequestHandler, String> { +public class Handler implements RequestHandler, String> { @Override - public String handleRequest(Map event, Context context) { + public String handleRequest(Map event, Context context) { String response = "200 OK"; MetricsLogger logger = new MetricsLogger(); diff --git a/gradle/wrapper/gradle-wrapper.properties b/gradle/wrapper/gradle-wrapper.properties index ac33e994..92f06b50 100644 --- a/gradle/wrapper/gradle-wrapper.properties +++ b/gradle/wrapper/gradle-wrapper.properties @@ -1,5 +1,5 @@ distributionBase=GRADLE_USER_HOME distributionPath=wrapper/dists -distributionUrl=https\://services.gradle.org/distributions/gradle-6.5.1-all.zip +distributionUrl=https\://services.gradle.org/distributions/gradle-7.4.2-all.zip zipStoreBase=GRADLE_USER_HOME zipStorePath=wrapper/dists diff --git a/src/jmh/java/software/amazon/cloudwatchlogs/emf/MetricsLoggerBenchmark.java b/src/jmh/java/software/amazon/cloudwatchlogs/emf/MetricsLoggerBenchmark.java new file mode 100644 index 00000000..7b273a6c --- /dev/null +++ b/src/jmh/java/software/amazon/cloudwatchlogs/emf/MetricsLoggerBenchmark.java @@ -0,0 +1,349 @@ +package software.amazon.cloudwatchlogs.emf; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import org.openjdk.jmh.annotations.*; +import software.amazon.cloudwatchlogs.emf.environment.Environment; +import software.amazon.cloudwatchlogs.emf.environment.EnvironmentProvider; +import software.amazon.cloudwatchlogs.emf.logger.MetricsLogger; +import software.amazon.cloudwatchlogs.emf.model.DimensionSet; +import software.amazon.cloudwatchlogs.emf.sinks.SinkShunt; + +@State(Scope.Benchmark) +@BenchmarkMode(Mode.AverageTime) +@Warmup(iterations = 3, time = 5) +@Measurement(iterations = 3, time = 5) +@OutputTimeUnit(TimeUnit.MILLISECONDS) +@Fork(value = 1) +public class MetricsLoggerBenchmark { + private MetricsLogger logger; + private EnvironmentProvider envProvider; + private SinkShunt sink; + private Environment environment; + + @Setup + public void setUp() { + envProvider = mock(EnvironmentProvider.class); + environment = mock(Environment.class); + sink = new SinkShunt(); + + when(envProvider.resolveEnvironment()) + .thenReturn(CompletableFuture.completedFuture(environment)); + when(environment.getSink()).thenReturn(sink); + logger = new MetricsLogger(envProvider); + } + + /** + * Publishing 10000 metrics with single thread. no lock: 0.844 ms/op; RW lock: 0.896 ms/op; S + * lock: 0.884 ms/op + */ + @Benchmark + public void measurePutMetric() { + logger = new MetricsLogger(envProvider); // 0.024 ms/op + + // should make this op dominate running time + for (int i = 0; i < 10000; i++) { + logger.putMetric("Metric-" + i, i); + } + } + + /** Flush with single thread. no lock: 0.148 ms/op; RW lock: 0.148 ms/op; S lock: 0.147 ms/op */ + @Benchmark + public void measureFlush() { + logger = new MetricsLogger(envProvider); + + logger.flush(); + + sink.shutdown(); + } + + /** + * Invoke all methods 100 times with single thread. no lock: 6.946 ms/op; RW lock: 6.988 ms/op; + * S lock: 6.823 ms/op + */ + @Benchmark + public void measureAllMethods() { + logger = new MetricsLogger(envProvider); + + for (int j = 0; j < 100; j++) { + logger.putMetadata("MetaData-" + j, j); + logger.putProperty("Property-" + j, j); + logger.putDimensions(DimensionSet.of("Dim-" + j, String.valueOf(j))); + logger.putMetric("Metric-" + j, j); + logger.flush(); + } + + sink.shutdown(); + } + + /** + * Each thread publishes 1000 metrics, 10 threads in total. no lock: 0.949 ms/op; RW lock: 3.823 + * ms/op; S lock: 3.078 ms/op + * + * @throws InterruptedException + */ + @Benchmark + public void measurePutMetricWith10Threads() throws InterruptedException { + measurePutMetricWithNThreads(10); + } + + /** + * Each thread publishes 1000 metrics, 20 threads in total. no lock: 1.860 ms/op; RW lock: 9.806 + * ms/op; S lock: 7.929 ms/op + * + * @throws InterruptedException + */ + @Benchmark + public void measurePutMetricWith20Threads() throws InterruptedException { + measurePutMetricWithNThreads(20); + } + + /** + * Each thread publishes 1000 metrics, 50 threads in total. no lock: 6.548 ms/op; RW lock: + * 28.754 ms/op; S lock: 24.700 ms/op + * + * @throws InterruptedException + */ + @Benchmark + public void measurePutMetricWith50Threads() throws InterruptedException { + measurePutMetricWithNThreads(50); + } + + /** + * Each thread publishes 1000 metrics, 200 threads in total. no lock: 37.662 ms/op; RW lock: + * 135.824 ms/op; S lock: 114.467 ms/op + * + * @throws InterruptedException + */ + @Benchmark + public void measurePutMetricWith200Threads() throws InterruptedException { + measurePutMetricWithNThreads(200); + } + + /** + * Each thread publishes 1000 metrics, 500 threads in total. no lock: 90.148 ms/op; RW lock: + * 345.197 ms/op; S lock: 287.908 ms/op + * + * @throws InterruptedException + */ + @Benchmark + @Warmup(time = 10) + @Measurement(time = 10) + public void measurePutMetricWith500Threads() throws InterruptedException { + measurePutMetricWithNThreads(500); + } + + /** + * Each thread flushes 100 times, 10 threads in total. no lock: 12.900 ms/op; RW lock: 25.015 + * ms/op; S lock: 24.778 ms/op + * + * @throws InterruptedException + */ + @Benchmark + public void measureFlushWith10Threads() throws InterruptedException { + measureFlushWithNThreads(10); + } + + /** + * Each thread flushes 100 times, 20 threads in total. no lock: 20.824 ms/op; RW lock: 47.123 + * ms/op; S lock: 48.511 ms/op + * + * @throws InterruptedException + */ + @Benchmark + public void measureFlushWith20Threads() throws InterruptedException { + measureFlushWithNThreads(20); + } + + /** + * Each thread flushes 100 times, 50 threads in total. no lock: 77.463 ms/op; RW lock: 121.857 + * ms/op; S lock: 125.212 ms/op + * + * @throws InterruptedException + */ + @Benchmark + public void measureFlushWith50Threads() throws InterruptedException { + measureFlushWithNThreads(50); + } + + /** + * Each thread flushes 100 times, 200 threads in total. no lock: 390.252 ms/op; RW lock: 474.439 + * ms/op; S lock: 488.809 ms/op + * + * @throws InterruptedException + */ + @Benchmark + public void measureFlushWith200Threads() throws InterruptedException { + measureFlushWithNThreads(200); + } + + /** + * Each thread flushes 100 times, 500 threads in total. no lock: 300.280 ms/op; RW lock: + * 1161.098 ms/op; S lock: 1247.972 ms/op + * + * @throws InterruptedException + */ + @Benchmark + @Warmup(time = 10) + @Measurement(time = 10) + public void measureFlushWith500Threads() throws InterruptedException { + measureFlushWithNThreads(500); + } + + /** + * Each thread executes all methods 100 times, 10 threads in total. no lock (need to sync + * getAllDimensions() & getAllDimensionKeys() in MetricsDirective): 7.215 ms/op; RW lock: + * 32.159; S lock: 34.226 + * + * @throws InterruptedException + */ + @Benchmark + public void measureAllMethodsWith10Threads() throws InterruptedException { + measureAllMethodsWithNThreads(10); + } + + /** + * Each thread executes all methods 100 times, 20 threads in total. no lock (need to sync + * getAllDimensions() & getAllDimensionKeys() in MetricsDirective): 11.833 ms/op; RW lock: + * 60.510 ms/op; S lock: 75.125 ms/op + * + * @throws InterruptedException + */ + @Benchmark + public void measureAllMethodsWith20Threads() throws InterruptedException { + measureAllMethodsWithNThreads(20); + } + + /** + * Each thread executes all methods 100 times, 50 threads in total. no lock (need to sync + * getAllDimensions() & getAllDimensionKeys() in MetricsDirective): 36.051 ms/op; RW lock: + * 150.022 ms/op; S lock: 244.934 ms/op + * + * @throws InterruptedException + */ + @Benchmark + public void measureAllMethodsWith50Threads() throws InterruptedException { + measureAllMethodsWithNThreads(50); + } + + /** + * Each thread executes all methods 100 times, 200 threads in total. no lock (need to sync + * getAllDimensions() & getAllDimensionKeys() in MetricsDirective): 108.775 ms/op; RW lock: + * 629.826 ms/op; S lock: 1220.959 ms/op + * + * @throws InterruptedException + */ + @Benchmark + @Warmup(time = 10) + @Measurement(time = 10) + public void measureAllMethodsWith200Threads() throws InterruptedException { + measureAllMethodsWithNThreads(200); + } + + /** + * Each thread executes all methods 100 times, 500 threads in total. no lock (need to sync + * getAllDimensions() & getAllDimensionKeys() in MetricsDirective): 335.183 ms/op; RW lock: + * 1741.003 ms/op; S lock: 4192.327 ms/op + * + * @throws InterruptedException + */ + @Benchmark + @Warmup(time = 10) + @Measurement(time = 10) + public void measureAllMethodsWith500Threads() throws InterruptedException { + measureAllMethodsWithNThreads(500); + } + + /** + * Each thread executes all methods 100 times, 1000 threads in total. no lock (need to sync + * getAllDimensions() & getAllDimensionKeys() in MetricsDirective): 575.339 ms/op; RW lock: + * 3230.403 ms/op; S lock: 13519.459 ms/op + * + * @throws InterruptedException + */ + @Benchmark + @Warmup(time = 20) + @Measurement(time = 20) + public void measureAllMethodsWith1000Threads() throws InterruptedException { + measureAllMethodsWithNThreads(1000); + } + + private void measurePutMetricWithNThreads(int n) throws InterruptedException { + logger = new MetricsLogger(envProvider); + Thread[] threads = new Thread[n]; + int batchSize = 1000; + + for (int i = 0; i < n; i++) { + final int id = i; + threads[i] = + new Thread( + () -> { + for (int j = batchSize * id; j < batchSize * id + batchSize; j++) { + logger.putMetric("Metric-" + j, j); + } + }); + threads[i].start(); + } + + for (Thread t : threads) { + t.join(); + } + } + + private void measureFlushWithNThreads(int n) throws InterruptedException { + logger = new MetricsLogger(envProvider); + Thread[] threads = new Thread[n]; + int batchSize = 100; + + for (int i = 0; i < n; i++) { + final int id = i; + threads[i] = + new Thread( + () -> { + for (int j = batchSize * id; j < batchSize * id + batchSize; j++) { + logger.flush(); + } + }); + threads[i].start(); + } + + for (Thread t : threads) { + t.join(); + } + + sink.shutdown(); + } + + private void measureAllMethodsWithNThreads(int n) throws InterruptedException { + logger = new MetricsLogger(envProvider); + Thread[] threads = new Thread[n]; + int batchSize = 100; + + for (int i = 0; i < n; i++) { + final int id = i; + threads[i] = + new Thread( + () -> { + for (int j = batchSize * id; j < batchSize * id + batchSize; j++) { + logger.putMetric("Metric-" + j, j); + logger.putProperty("Property-" + j, j); + logger.putMetadata("MetaData-" + j, j); + logger.setDimensions( + DimensionSet.of("Dim-" + j, String.valueOf(j))); + + logger.flush(); + } + }); + threads[i].start(); + } + + for (Thread t : threads) { + t.join(); + } + + sink.shutdown(); + } +} diff --git a/src/main/java/software/amazon/cloudwatchlogs/emf/config/SystemWrapper.java b/src/main/java/software/amazon/cloudwatchlogs/emf/config/SystemWrapper.java index 91ef3069..4e2716fc 100644 --- a/src/main/java/software/amazon/cloudwatchlogs/emf/config/SystemWrapper.java +++ b/src/main/java/software/amazon/cloudwatchlogs/emf/config/SystemWrapper.java @@ -18,6 +18,9 @@ /** A wrapper class that can be used to mock 'System.getenv' with PowerMock. */ public class SystemWrapper { + private SystemWrapper() { + throw new IllegalStateException("Utility class"); + } public static String getenv(String name) { return System.getenv(name); diff --git a/src/main/java/software/amazon/cloudwatchlogs/emf/environment/AgentBasedEnvironment.java b/src/main/java/software/amazon/cloudwatchlogs/emf/environment/AgentBasedEnvironment.java index 4306eaa0..5c6f69d1 100644 --- a/src/main/java/software/amazon/cloudwatchlogs/emf/environment/AgentBasedEnvironment.java +++ b/src/main/java/software/amazon/cloudwatchlogs/emf/environment/AgentBasedEnvironment.java @@ -30,7 +30,7 @@ public abstract class AgentBasedEnvironment implements Environment { private final Configuration config; private ISink sink; - public AgentBasedEnvironment(Configuration config) { + protected AgentBasedEnvironment(Configuration config) { this.config = config; } diff --git a/src/main/java/software/amazon/cloudwatchlogs/emf/logger/MetricsLogger.java b/src/main/java/software/amazon/cloudwatchlogs/emf/logger/MetricsLogger.java index 459014b1..8f9d6773 100644 --- a/src/main/java/software/amazon/cloudwatchlogs/emf/logger/MetricsLogger.java +++ b/src/main/java/software/amazon/cloudwatchlogs/emf/logger/MetricsLogger.java @@ -18,6 +18,8 @@ import java.time.Instant; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.function.Supplier; import lombok.Getter; import lombok.Setter; import lombok.extern.slf4j.Slf4j; @@ -37,6 +39,13 @@ public class MetricsLogger { private MetricsContext context; private CompletableFuture environmentFuture; private EnvironmentProvider environmentProvider; + /** + * This lock is used to create an internal sync context for flush() method in multi-threaded + * situations. Flush() acquires write lock, other methods (accessing mutable shared data with + * flush()) acquires read lock. This makes sure flush() is executed exclusively, while other + * methods can be executed concurrently. + */ + private final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock(); @Getter @Setter private boolean flushPreserveDimensions = true; @@ -72,13 +81,18 @@ public void flush() { environment = environmentProvider.getDefaultEnvironment(); } - ISink sink = environment.getSink(); - configureContextForEnvironment(context, environment); - sink.accept(context); - context = - flushPreserveDimensions - ? context.createCopyWithContext() - : context.createCopyWithContextWithoutDimensions(); + rwl.writeLock().lock(); + try { + ISink sink = environment.getSink(); + configureContextForEnvironment(context, environment); + sink.accept(context); + context = + flushPreserveDimensions + ? context.createCopyWithContext() + : context.createCopyWithContextWithoutDimensions(); + } finally { + rwl.writeLock().unlock(); + } } /** @@ -91,8 +105,11 @@ public void flush() { * @return the current logger */ public MetricsLogger putProperty(String key, Object value) { - this.context.putProperty(key, value); - return this; + return applyReadLock( + () -> { + this.context.putProperty(key, value); + return this; + }); } /** @@ -107,8 +124,11 @@ public MetricsLogger putProperty(String key, Object value) { * @return the current logger */ public MetricsLogger putDimensions(DimensionSet dimensions) { - context.putDimension(dimensions); - return this; + return applyReadLock( + () -> { + context.putDimension(dimensions); + return this; + }); } /** @@ -121,8 +141,11 @@ public MetricsLogger putDimensions(DimensionSet dimensions) { * @return the current logger */ public MetricsLogger setDimensions(DimensionSet... dimensionSets) { - context.setDimensions(dimensionSets); - return this; + return applyReadLock( + () -> { + context.setDimensions(dimensionSets); + return this; + }); } /** @@ -134,8 +157,11 @@ public MetricsLogger setDimensions(DimensionSet... dimensionSets) { * @return the current logger */ public MetricsLogger setDimensions(boolean useDefault, DimensionSet... dimensionSets) { - context.setDimensions(useDefault, dimensionSets); - return this; + return applyReadLock( + () -> { + context.setDimensions(useDefault, dimensionSets); + return this; + }); } /** @@ -146,8 +172,11 @@ public MetricsLogger setDimensions(boolean useDefault, DimensionSet... dimension * @return the current logger */ public MetricsLogger resetDimensions(boolean useDefault) { - context.resetDimensions(useDefault); - return this; + return applyReadLock( + () -> { + context.resetDimensions(useDefault); + return this; + }); } /** @@ -161,8 +190,11 @@ public MetricsLogger resetDimensions(boolean useDefault) { * @return the current logger */ public MetricsLogger putMetric(String key, double value, Unit unit) { - this.context.putMetric(key, value, unit); - return this; + return applyReadLock( + () -> { + this.context.putMetric(key, value, unit); + return this; + }); } /** @@ -175,7 +207,7 @@ public MetricsLogger putMetric(String key, double value, Unit unit) { * @return the current logger */ public MetricsLogger putMetric(String key, double value) { - this.context.putMetric(key, value, Unit.NONE); + this.putMetric(key, value, Unit.NONE); return this; } @@ -190,8 +222,11 @@ public MetricsLogger putMetric(String key, double value) { * @return the current logger */ public MetricsLogger putMetadata(String key, Object value) { - this.context.putMetadata(key, value); - return this; + return applyReadLock( + () -> { + this.context.putMetadata(key, value); + return this; + }); } /** @@ -227,4 +262,13 @@ private void configureContextForEnvironment(MetricsContext context, Environment context.setDefaultDimensions(defaultDimension); environment.configureContext(context); } + + private MetricsLogger applyReadLock(Supplier any) { + rwl.readLock().lock(); + try { + return any.get(); + } finally { + rwl.readLock().unlock(); + } + } } diff --git a/src/main/java/software/amazon/cloudwatchlogs/emf/model/Metadata.java b/src/main/java/software/amazon/cloudwatchlogs/emf/model/Metadata.java index 638850e5..3ebe48ae 100644 --- a/src/main/java/software/amazon/cloudwatchlogs/emf/model/Metadata.java +++ b/src/main/java/software/amazon/cloudwatchlogs/emf/model/Metadata.java @@ -23,9 +23,9 @@ import com.fasterxml.jackson.databind.annotation.JsonSerialize; import java.time.Instant; import java.util.ArrayList; -import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; import lombok.AllArgsConstructor; import lombok.Getter; import lombok.Setter; @@ -56,7 +56,7 @@ class Metadata { Metadata() { cloudWatchMetrics = new ArrayList<>(); timestamp = Instant.now(); - customFields = new HashMap<>(); + customFields = new ConcurrentHashMap<>(); } /** diff --git a/src/main/java/software/amazon/cloudwatchlogs/emf/model/MetricDirective.java b/src/main/java/software/amazon/cloudwatchlogs/emf/model/MetricDirective.java index ef49d8d9..e2d8e6d1 100644 --- a/src/main/java/software/amazon/cloudwatchlogs/emf/model/MetricDirective.java +++ b/src/main/java/software/amazon/cloudwatchlogs/emf/model/MetricDirective.java @@ -19,6 +19,7 @@ import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonProperty; import java.util.*; +import java.util.concurrent.ConcurrentHashMap; import java.util.stream.Collectors; import lombok.*; @@ -45,8 +46,8 @@ class MetricDirective { MetricDirective() { namespace = "aws-embedded-metrics"; - metrics = new HashMap<>(); - dimensions = new ArrayList<>(); + metrics = new ConcurrentHashMap<>(); + dimensions = Collections.synchronizedList(new ArrayList<>()); defaultDimensions = new DimensionSet(); shouldUseDefaultDimension = true; } @@ -59,7 +60,7 @@ class MetricDirective { void putDimensionSet(DimensionSet dimensionSet) { // Duplicate dimensions sets are removed before being added to the end of the collection. // This ensures only latest dimension value is used as a target member on the root EMF node. - // This operation is O(n^2), but acceptable given sets are capped at 10 dimensions + // This operation is O(n^2), but acceptable given sets are capped at 30 dimensions dimensions.removeIf(dim -> dim.getDimensionKeys().equals(dimensionSet.getDimensionKeys())); dimensions.add(dimensionSet); } @@ -69,11 +70,15 @@ void putMetric(String key, double value) { } void putMetric(String key, double value, Unit unit) { - if (metrics.containsKey(key)) { - metrics.get(key).addValue(value); - } else { - metrics.put(key, new MetricDefinition(key, unit, value)); - } + metrics.compute( + key, + (k, v) -> { + if (v == null) return new MetricDefinition(key, unit, value); + else { + v.addValue(value); + return v; + } + }); } @JsonProperty("Metrics") @@ -95,7 +100,7 @@ List> getAllDimensionKeys() { */ void setDimensions(List dimensionSets) { shouldUseDefaultDimension = false; - dimensions = new ArrayList<>(dimensionSets); + dimensions = Collections.synchronizedList(new ArrayList<>(dimensionSets)); } /** @@ -106,7 +111,7 @@ void setDimensions(List dimensionSets) { */ void setDimensions(boolean useDefault, List dimensionSets) { shouldUseDefaultDimension = useDefault; - dimensions = new ArrayList<>(dimensionSets); + dimensions = Collections.synchronizedList(new ArrayList<>(dimensionSets)); } /** @@ -116,7 +121,7 @@ void setDimensions(boolean useDefault, List dimensionSets) { */ void resetDimensions(boolean useDefault) { shouldUseDefaultDimension = useDefault; - dimensions = new ArrayList<>(); + dimensions = Collections.synchronizedList(new ArrayList<>()); } /** diff --git a/src/main/java/software/amazon/cloudwatchlogs/emf/model/MetricsContext.java b/src/main/java/software/amazon/cloudwatchlogs/emf/model/MetricsContext.java index 65df89d3..18cd5015 100644 --- a/src/main/java/software/amazon/cloudwatchlogs/emf/model/MetricsContext.java +++ b/src/main/java/software/amazon/cloudwatchlogs/emf/model/MetricsContext.java @@ -91,7 +91,7 @@ public void setDefaultDimensions(DimensionSet dimensionSet) { } public boolean hasDefaultDimensions() { - return getDefaultDimensions().getDimensionKeys().size() > 0; + return !getDefaultDimensions().getDimensionKeys().isEmpty(); } /** @@ -258,7 +258,7 @@ public List serialize() throws JsonProcessingException { Map metrics = new HashMap<>(); Queue metricDefinitions = new LinkedList<>(rootNode.metrics().values()); - while (metricDefinitions.size() > 0) { + while (!metricDefinitions.isEmpty()) { MetricDefinition metric = metricDefinitions.poll(); if (metrics.size() == Constants.MAX_METRICS_PER_EVENT diff --git a/src/main/java/software/amazon/cloudwatchlogs/emf/model/RootNode.java b/src/main/java/software/amazon/cloudwatchlogs/emf/model/RootNode.java index 50d74e70..fb790322 100644 --- a/src/main/java/software/amazon/cloudwatchlogs/emf/model/RootNode.java +++ b/src/main/java/software/amazon/cloudwatchlogs/emf/model/RootNode.java @@ -25,6 +25,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; import lombok.AllArgsConstructor; import lombok.Getter; import lombok.With; @@ -45,7 +46,7 @@ class RootNode { RootNode() { aws = new Metadata(); - properties = new HashMap<>(); + properties = new ConcurrentHashMap<>(); objectMapper.setFilterProvider(filterProvider); } diff --git a/src/main/java/software/amazon/cloudwatchlogs/emf/serializers/InstantSerializer.java b/src/main/java/software/amazon/cloudwatchlogs/emf/serializers/InstantSerializer.java index 854a07c6..f2297e15 100644 --- a/src/main/java/software/amazon/cloudwatchlogs/emf/serializers/InstantSerializer.java +++ b/src/main/java/software/amazon/cloudwatchlogs/emf/serializers/InstantSerializer.java @@ -17,7 +17,6 @@ package software.amazon.cloudwatchlogs.emf.serializers; import com.fasterxml.jackson.core.JsonGenerator; -import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.SerializerProvider; import com.fasterxml.jackson.databind.ser.std.StdSerializer; import java.io.IOException; @@ -35,8 +34,7 @@ public class InstantSerializer extends StdSerializer { @Override public void serialize(Instant value, JsonGenerator jgen, SerializerProvider provider) - throws IOException, JsonProcessingException { - + throws IOException { jgen.writeNumber(value.toEpochMilli()); } } diff --git a/src/main/java/software/amazon/cloudwatchlogs/emf/serializers/UnitDeserializer.java b/src/main/java/software/amazon/cloudwatchlogs/emf/serializers/UnitDeserializer.java index 0a54cbdb..8ef87531 100644 --- a/src/main/java/software/amazon/cloudwatchlogs/emf/serializers/UnitDeserializer.java +++ b/src/main/java/software/amazon/cloudwatchlogs/emf/serializers/UnitDeserializer.java @@ -17,7 +17,6 @@ package software.amazon.cloudwatchlogs.emf.serializers; import com.fasterxml.jackson.core.JsonParser; -import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.DeserializationContext; import com.fasterxml.jackson.databind.deser.std.StdDeserializer; import java.io.IOException; @@ -34,11 +33,8 @@ public class UnitDeserializer extends StdDeserializer { } @Override - public Unit deserialize(JsonParser jp, DeserializationContext ctxt) - throws IOException, JsonProcessingException { - + public Unit deserialize(JsonParser jp, DeserializationContext ctxt) throws IOException { String value = jp.getValueAsString(); - Unit unit = Unit.fromValue(value); - return unit; + return Unit.fromValue(value); } } diff --git a/src/main/java/software/amazon/cloudwatchlogs/emf/serializers/UnitSerializer.java b/src/main/java/software/amazon/cloudwatchlogs/emf/serializers/UnitSerializer.java index 99afffac..f9f823a7 100644 --- a/src/main/java/software/amazon/cloudwatchlogs/emf/serializers/UnitSerializer.java +++ b/src/main/java/software/amazon/cloudwatchlogs/emf/serializers/UnitSerializer.java @@ -17,7 +17,6 @@ package software.amazon.cloudwatchlogs.emf.serializers; import com.fasterxml.jackson.core.JsonGenerator; -import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.SerializerProvider; import com.fasterxml.jackson.databind.ser.std.StdSerializer; import java.io.IOException; @@ -35,8 +34,7 @@ public class UnitSerializer extends StdSerializer { @Override public void serialize(Unit value, JsonGenerator jgen, SerializerProvider provider) - throws IOException, JsonProcessingException { - + throws IOException { String str = value.toString(); jgen.writeString(str); } diff --git a/src/main/java/software/amazon/cloudwatchlogs/emf/util/IOUtils.java b/src/main/java/software/amazon/cloudwatchlogs/emf/util/IOUtils.java index 3bfe0e6f..ea3cef63 100644 --- a/src/main/java/software/amazon/cloudwatchlogs/emf/util/IOUtils.java +++ b/src/main/java/software/amazon/cloudwatchlogs/emf/util/IOUtils.java @@ -24,6 +24,10 @@ import org.slf4j.Logger; public class IOUtils { + private IOUtils() { + throw new IllegalStateException("Utility class"); + } + private static final int BUFFER_SIZE = 1024 * 4; /** diff --git a/src/main/java/software/amazon/cloudwatchlogs/emf/util/Jackson.java b/src/main/java/software/amazon/cloudwatchlogs/emf/util/Jackson.java index 369b608f..6b0a9e0f 100644 --- a/src/main/java/software/amazon/cloudwatchlogs/emf/util/Jackson.java +++ b/src/main/java/software/amazon/cloudwatchlogs/emf/util/Jackson.java @@ -22,6 +22,9 @@ import software.amazon.cloudwatchlogs.emf.exception.EMFClientException; public class Jackson { + private Jackson() { + throw new IllegalStateException("Utility class"); + } private static final ObjectMapper objectMapper = new ObjectMapper(); private static final ObjectWriter writer = objectMapper.writer(); diff --git a/src/main/java/software/amazon/cloudwatchlogs/emf/util/StringUtils.java b/src/main/java/software/amazon/cloudwatchlogs/emf/util/StringUtils.java index 1465135c..9d26c4e1 100644 --- a/src/main/java/software/amazon/cloudwatchlogs/emf/util/StringUtils.java +++ b/src/main/java/software/amazon/cloudwatchlogs/emf/util/StringUtils.java @@ -17,6 +17,9 @@ package software.amazon.cloudwatchlogs.emf.util; public class StringUtils { + private StringUtils() { + throw new IllegalStateException("Utility class"); + } public static boolean isNullOrEmpty(String value) { return value == null || value.isEmpty(); diff --git a/src/test/java/software/amazon/cloudwatchlogs/emf/config/ConfigurationTest.java b/src/test/java/software/amazon/cloudwatchlogs/emf/config/ConfigurationTest.java index 1859190a..e5a4bd65 100644 --- a/src/test/java/software/amazon/cloudwatchlogs/emf/config/ConfigurationTest.java +++ b/src/test/java/software/amazon/cloudwatchlogs/emf/config/ConfigurationTest.java @@ -42,8 +42,8 @@ public void testReturnEmptyOrDefaultIfNotSet() { assertFalse(config.getServiceType().isPresent()); assertFalse(config.getServiceName().isPresent()); - assertEquals(config.getEnvironmentOverride(), Environments.Unknown); - assertEquals(config.getAsyncBufferSize(), 100); + assertEquals(Environments.Unknown, config.getEnvironmentOverride()); + assertEquals(100, config.getAsyncBufferSize()); } @Test @@ -60,7 +60,7 @@ public void testReturnEmptyIfStringValueIsBlank() { assertFalse(config.getLogStreamName().isPresent()); assertFalse(config.getServiceType().isPresent()); assertFalse(config.getServiceName().isPresent()); - assertEquals(config.getEnvironmentOverride(), Environments.Unknown); + assertEquals(Environments.Unknown, config.getEnvironmentOverride()); } @Test @@ -81,12 +81,12 @@ public void testReturnCorrectValueAfterSet() { config.setEnvironmentOverride(expectedEnvironment); config.setAsyncBufferSize(expectedAsyncBufferSize); - assertEquals(config.getAgentEndpoint().get(), expectedEndpoint); - assertEquals(config.getLogGroupName().get(), expectedLogGroupName); - assertEquals(config.getLogStreamName().get(), expectedLogStreamName); - assertEquals(config.getServiceType().get(), expectedServiceType); - assertEquals(config.getServiceName().get(), expectedServiceName); - assertEquals(config.getEnvironmentOverride(), expectedEnvironment); - assertEquals(config.getAsyncBufferSize(), expectedAsyncBufferSize); + assertEquals(expectedEndpoint, config.getAgentEndpoint().get()); + assertEquals(expectedLogGroupName, config.getLogGroupName().get()); + assertEquals(expectedLogStreamName, config.getLogStreamName().get()); + assertEquals(expectedServiceType, config.getServiceType().get()); + assertEquals(expectedServiceName, config.getServiceName().get()); + assertEquals(expectedEnvironment, config.getEnvironmentOverride()); + assertEquals(expectedAsyncBufferSize, config.getAsyncBufferSize()); } } diff --git a/src/test/java/software/amazon/cloudwatchlogs/emf/config/EnvironmentConfigurationProviderTest.java b/src/test/java/software/amazon/cloudwatchlogs/emf/config/EnvironmentConfigurationProviderTest.java index 28c3b017..9e686070 100644 --- a/src/test/java/software/amazon/cloudwatchlogs/emf/config/EnvironmentConfigurationProviderTest.java +++ b/src/test/java/software/amazon/cloudwatchlogs/emf/config/EnvironmentConfigurationProviderTest.java @@ -43,13 +43,13 @@ public void getGetConfig() { Configuration config = EnvironmentConfigurationProvider.createConfig(); - assertEquals(config.getServiceName().get(), "TestServiceName"); - assertEquals(config.getServiceType().get(), "TestServiceType"); - assertEquals(config.getLogGroupName().get(), "TestLogGroup"); - assertEquals(config.getLogStreamName().get(), "TestLogStream"); - assertEquals(config.getAgentEndpoint().get(), "Endpoint"); - assertEquals(config.getEnvironmentOverride(), Environments.Agent); - assertEquals(config.getAsyncBufferSize(), 9999); + assertEquals("TestServiceName", config.getServiceName().get()); + assertEquals("TestServiceType", config.getServiceType().get()); + assertEquals("TestLogGroup", config.getLogGroupName().get()); + assertEquals("TestLogStream", config.getLogStreamName().get()); + assertEquals("Endpoint", config.getAgentEndpoint().get()); + assertEquals(Environments.Agent, config.getEnvironmentOverride()); + assertEquals(9999, config.getAsyncBufferSize()); } @Test diff --git a/src/test/java/software/amazon/cloudwatchlogs/emf/environment/DefaultEnvironmentTest.java b/src/test/java/software/amazon/cloudwatchlogs/emf/environment/DefaultEnvironmentTest.java index ecbd6c66..e0aaf9d3 100644 --- a/src/test/java/software/amazon/cloudwatchlogs/emf/environment/DefaultEnvironmentTest.java +++ b/src/test/java/software/amazon/cloudwatchlogs/emf/environment/DefaultEnvironmentTest.java @@ -39,33 +39,33 @@ public void setUp() { public void testGetName() { String serviceName = "TestService"; when(configuration.getServiceName()).thenReturn(Optional.of(serviceName)); - assertEquals(environment.getName(), serviceName); + assertEquals(serviceName, environment.getName()); } @Test public void testGetNameWhenNotConfigured() { when(configuration.getServiceName()).thenReturn(Optional.empty()); - assertEquals(environment.getName(), "Unknown"); + assertEquals("Unknown", environment.getName()); } @Test public void testGetType() { String serviceType = "TestServiceType"; when(configuration.getServiceType()).thenReturn(Optional.of(serviceType)); - assertEquals(environment.getType(), serviceType); + assertEquals(serviceType, environment.getType()); } @Test public void testGetTypeWhenNotConfigured() { when(configuration.getServiceType()).thenReturn(Optional.empty()); - assertEquals(environment.getType(), "Unknown"); + assertEquals("Unknown", environment.getType()); } @Test public void testGetLogStreamName() { String logStream = "TestLogStream"; when(configuration.getLogStreamName()).thenReturn(Optional.of(logStream)); - assertEquals(environment.getLogStreamName(), logStream); + assertEquals(logStream, environment.getLogStreamName()); } @Test @@ -73,14 +73,14 @@ public void testGetLogStreamNameWhenNotConfigured() { String serviceName = "TestService"; when(configuration.getLogStreamName()).thenReturn(Optional.empty()); when(configuration.getServiceName()).thenReturn(Optional.of(serviceName)); - assertEquals(environment.getLogStreamName(), ""); + assertEquals("", environment.getLogStreamName()); } @Test public void testGetLogGroupName() { String logGroup = "TestLogGroup"; when(configuration.getLogGroupName()).thenReturn(Optional.of(logGroup)); - assertEquals(environment.getLogGroupName(), logGroup); + assertEquals(logGroup, environment.getLogGroupName()); } @Test @@ -88,6 +88,6 @@ public void testGetLogLogNameWhenNotConfigured() { String serviceName = "TestService"; when(configuration.getLogGroupName()).thenReturn(Optional.empty()); when(configuration.getServiceName()).thenReturn(Optional.of(serviceName)); - assertEquals(environment.getLogGroupName(), serviceName + "-metrics"); + assertEquals(serviceName + "-metrics", environment.getLogGroupName()); } } diff --git a/src/test/java/software/amazon/cloudwatchlogs/emf/environment/EC2EnvironmentTest.java b/src/test/java/software/amazon/cloudwatchlogs/emf/environment/EC2EnvironmentTest.java index bab2f2d6..a68cbb79 100644 --- a/src/test/java/software/amazon/cloudwatchlogs/emf/environment/EC2EnvironmentTest.java +++ b/src/test/java/software/amazon/cloudwatchlogs/emf/environment/EC2EnvironmentTest.java @@ -69,7 +69,7 @@ public void testProbeReturnTrue() { public void testGetTypeWhenNoMetadata() { when(fetcher.fetch(any(), any())).thenThrow(new EMFClientException("Invalid URL")); environment.probe(); - assertEquals(environment.getType(), Constants.UNKNOWN); + assertEquals(Constants.UNKNOWN, environment.getType()); } @Test @@ -78,7 +78,7 @@ public void testGetTypeReturnDefined() { when(fetcher.fetch(any(), any(), (Class) any(), any())) .thenReturn(new EC2Environment.EC2Metadata()); environment.probe(); - assertEquals(environment.getType(), "AWS::EC2::Instance"); + assertEquals("AWS::EC2::Instance", environment.getType()); } @Test @@ -87,7 +87,7 @@ public void testGetTypeFromConfiguration() { environment.probe(); String testType = faker.letterify("???"); when(config.getServiceType()).thenReturn(Optional.of(testType)); - assertEquals(environment.getType(), testType); + assertEquals(testType, environment.getType()); } @Test @@ -101,11 +101,11 @@ public void testConfigureContext() { MetricsContext context = new MetricsContext(); environment.configureContext(context); - assertEquals(context.getProperty("imageId"), metadata.getImageId()); - assertEquals(context.getProperty("instanceId"), metadata.getInstanceId()); - assertEquals(context.getProperty("instanceType"), metadata.getInstanceType()); - assertEquals(context.getProperty("privateIp"), metadata.getPrivateIp()); - assertEquals(context.getProperty("availabilityZone"), metadata.getAvailabilityZone()); + assertEquals(metadata.getImageId(), context.getProperty("imageId")); + assertEquals(metadata.getInstanceId(), context.getProperty("instanceId")); + assertEquals(metadata.getInstanceType(), context.getProperty("instanceType")); + assertEquals(metadata.getPrivateIp(), context.getProperty("privateIp")); + assertEquals(metadata.getAvailabilityZone(), context.getProperty("availabilityZone")); } private void getRandomMetadata(EC2Environment.EC2Metadata metadata) { diff --git a/src/test/java/software/amazon/cloudwatchlogs/emf/environment/ECSEnvironmentTest.java b/src/test/java/software/amazon/cloudwatchlogs/emf/environment/ECSEnvironmentTest.java index eef757c7..22e80a86 100644 --- a/src/test/java/software/amazon/cloudwatchlogs/emf/environment/ECSEnvironmentTest.java +++ b/src/test/java/software/amazon/cloudwatchlogs/emf/environment/ECSEnvironmentTest.java @@ -85,7 +85,7 @@ public void testFormatImageName() { when(fetcher.fetch(any(), (ObjectMapper) any(), any())).thenReturn(metadata); assertTrue(environment.probe()); - assertEquals(environment.getName(), "testImage:latest"); + assertEquals("testImage:latest", environment.getName()); } @Test @@ -93,25 +93,25 @@ public void testGetNameFromConfig() { String serviceName = "testService"; when(config.getServiceName()).thenReturn(Optional.of(serviceName)); - assertEquals(environment.getName(), serviceName); + assertEquals(serviceName, environment.getName()); } @Test public void testGetNameReturnsUnknown() { when(config.getServiceName()).thenReturn(Optional.empty()); - assertEquals(environment.getName(), Constants.UNKNOWN); + assertEquals(Constants.UNKNOWN, environment.getName()); } @Test public void testGetType() { - assertEquals(environment.getType(), "AWS::ECS::Container"); + assertEquals("AWS::ECS::Container", environment.getType()); } @Test public void testGetTypeFromConfig() { String type = faker.letterify("????"); when(config.getServiceType()).thenReturn(Optional.of(type)); - assertEquals(environment.getType(), type); + assertEquals(type, environment.getType()); } @Test @@ -128,7 +128,7 @@ public void testSetFluentBit() { ArgumentCaptor argument = ArgumentCaptor.forClass(String.class); Mockito.verify(config, times(1)).setAgentEndpoint(argument.capture()); assertEquals( - argument.getValue(), "tcp://" + fluentHost + ":" + Constants.DEFAULT_AGENT_PORT); + "tcp://" + fluentHost + ":" + Constants.DEFAULT_AGENT_PORT, argument.getValue()); } @Test @@ -141,13 +141,12 @@ public void testGetLogGroupNameReturnEmpty() { environment.probe(); - assertEquals(environment.getLogGroupName(), ""); + assertEquals("", environment.getLogGroupName()); } @Test public void testGetLogGroupNameReturnNonEmpty() { - - assertEquals(environment.getLogGroupName(), Constants.UNKNOWN + "-metrics"); + assertEquals(Constants.UNKNOWN + "-metrics", environment.getLogGroupName()); } @Test @@ -172,17 +171,16 @@ public void testConfigureContext() throws UnknownHostException { MetricsContext context = new MetricsContext(); environment.configureContext(context); - assertEquals(context.getProperty("containerId"), InetAddress.getLocalHost().getHostName()); - assertEquals(context.getProperty("createdAt"), metadata.getCreatedAt()); - assertEquals(context.getProperty("startedAt"), metadata.getStartedAt()); + assertEquals(InetAddress.getLocalHost().getHostName(), context.getProperty("containerId")); + assertEquals(metadata.getCreatedAt(), context.getProperty("createdAt")); + assertEquals(metadata.getStartedAt(), context.getProperty("startedAt")); assertEquals( - context.getProperty("cluster"), metadata.labels.get("com.amazonaws.ecs.cluster")); + metadata.labels.get("com.amazonaws.ecs.cluster"), context.getProperty("cluster")); assertEquals( - context.getProperty("taskArn"), metadata.labels.get("com.amazonaws.ecs.task-arn")); + metadata.labels.get("com.amazonaws.ecs.task-arn"), context.getProperty("taskArn")); } private void getRandomMetadata(ECSEnvironment.ECSMetadata metadata) { - metadata.createdAt = faker.date().past(1, TimeUnit.DAYS).toString(); metadata.startedAt = faker.date().past(1, TimeUnit.DAYS).toString(); metadata.image = faker.letterify("?????"); diff --git a/src/test/java/software/amazon/cloudwatchlogs/emf/environment/LambdaEnvironmentTest.java b/src/test/java/software/amazon/cloudwatchlogs/emf/environment/LambdaEnvironmentTest.java index 9d7cf73f..7733662a 100644 --- a/src/test/java/software/amazon/cloudwatchlogs/emf/environment/LambdaEnvironmentTest.java +++ b/src/test/java/software/amazon/cloudwatchlogs/emf/environment/LambdaEnvironmentTest.java @@ -47,12 +47,12 @@ public void testGetNameReturnFunctionsName() { String expectedName = faker.name().name(); when(SystemWrapper.getenv("AWS_LAMBDA_FUNCTION_NAME")).thenReturn(expectedName); - assertEquals(lambda.getName(), expectedName); + assertEquals(expectedName, lambda.getName()); } @Test public void testGetTypeReturnCFNLambdaName() { - assertEquals(lambda.getType(), "AWS::Lambda::Function"); + assertEquals("AWS::Lambda::Function", lambda.getType()); } @Test @@ -60,7 +60,7 @@ public void testGetLogGroupNameReturnFunctionName() { String expectedName = faker.name().name(); when(SystemWrapper.getenv("AWS_LAMBDA_FUNCTION_NAME")).thenReturn(expectedName); - assertEquals(lambda.getLogGroupName(), expectedName); + assertEquals(expectedName, lambda.getLogGroupName()); } @Test @@ -78,9 +78,9 @@ public void testConfigureContextAddProperties() { lambda.configureContext(mc); - assertEquals(mc.getProperty("executionEnvironment"), expectedEnv); - assertEquals(mc.getProperty("functionVersion"), expectedVersion); - assertEquals(mc.getProperty("logStreamId"), expectedLogName); + assertEquals(expectedEnv, mc.getProperty("executionEnvironment")); + assertEquals(expectedVersion, mc.getProperty("functionVersion")); + assertEquals(expectedLogName, mc.getProperty("logStreamId")); assertNull(mc.getProperty("traceId")); } @@ -93,7 +93,7 @@ public void testContextWithTraceId() { lambda.configureContext(mc); - assertEquals(mc.getProperty("traceId"), expectedTraceId); + assertEquals(expectedTraceId, mc.getProperty("traceId")); } @Test diff --git a/src/test/java/software/amazon/cloudwatchlogs/emf/environment/LocalEnvironmentTest.java b/src/test/java/software/amazon/cloudwatchlogs/emf/environment/LocalEnvironmentTest.java index 22a8407f..6705beec 100644 --- a/src/test/java/software/amazon/cloudwatchlogs/emf/environment/LocalEnvironmentTest.java +++ b/src/test/java/software/amazon/cloudwatchlogs/emf/environment/LocalEnvironmentTest.java @@ -41,48 +41,46 @@ public void setUp() { @Test public void testProbeReturnFalse() { - assertFalse(environment.probe()); } @Test public void testGetName() { when(config.getServiceName()).thenReturn(Optional.empty()); - assertEquals(environment.getName(), Constants.UNKNOWN); + assertEquals(Constants.UNKNOWN, environment.getName()); String name = faker.letterify("?????"); when(config.getServiceName()).thenReturn(Optional.of(name)); - assertEquals(environment.getName(), name); + assertEquals(name, environment.getName()); } @Test public void testGetType() { when(config.getServiceType()).thenReturn(Optional.empty()); - assertEquals(environment.getType(), Constants.UNKNOWN); + assertEquals(Constants.UNKNOWN, environment.getType()); String type = faker.letterify("?????"); when(config.getServiceType()).thenReturn(Optional.of(type)); - assertEquals(environment.getType(), type); + assertEquals(type, environment.getType()); } @Test public void testGetLogGroupName() { when(config.getLogGroupName()).thenReturn(Optional.empty()); - assertEquals(environment.getLogGroupName(), Constants.UNKNOWN + "-metrics"); + assertEquals(Constants.UNKNOWN + "-metrics", environment.getLogGroupName()); when(config.getLogGroupName()).thenReturn(Optional.empty()); String serviceName = faker.letterify("?????"); when(config.getServiceName()).thenReturn(Optional.of(serviceName)); - assertEquals(environment.getLogGroupName(), serviceName + "-metrics"); + assertEquals(serviceName + "-metrics", environment.getLogGroupName()); String logGroupName = faker.letterify("?????"); when(config.getLogGroupName()).thenReturn(Optional.of(logGroupName)); - assertEquals(environment.getLogGroupName(), logGroupName); + assertEquals(logGroupName, environment.getLogGroupName()); } @Test public void testGetSink() { - assertTrue(environment.getSink() instanceof ConsoleSink); assertSame(environment.getSink(), environment.getSink()); } diff --git a/src/test/java/software/amazon/cloudwatchlogs/emf/environment/ResourceFetcherTest.java b/src/test/java/software/amazon/cloudwatchlogs/emf/environment/ResourceFetcherTest.java index 92e20f5a..d3fa53bf 100644 --- a/src/test/java/software/amazon/cloudwatchlogs/emf/environment/ResourceFetcherTest.java +++ b/src/test/java/software/amazon/cloudwatchlogs/emf/environment/ResourceFetcherTest.java @@ -92,8 +92,8 @@ public void testReadDataWith200Response() { generateStub(200, "{\"name\":\"test\",\"size\":10}"); TestData data = fetcher.fetch(endpoint, TestData.class); - assertEquals(data.name, "test"); - assertEquals(data.size, 10); + assertEquals("test", data.name); + assertEquals(10, data.size); } @Test @@ -107,8 +107,8 @@ public void testReadDataWithHeaders200Response() { verify( getRequestedFor(urlEqualTo(endpoint_path)) .withHeader("X-mock-header-key", equalTo("headerValue"))); - assertEquals(data.name, "test"); - assertEquals(data.size, 10); + assertEquals("test", data.name); + assertEquals(10, data.size); } @Test @@ -120,7 +120,7 @@ public void testWithProvidedMethodAndHeadersWith200Response() { verify( putRequestedFor(urlEqualTo(endpoint_path)) .withHeader("X-mock-header-key", equalTo("headerValue"))); - assertEquals(data, "putResponseData"); + assertEquals("putResponseData", data); } @Test @@ -130,8 +130,8 @@ public void testReadCaseInsensitiveDataWith200Response() { objectMapper.configure(MapperFeature.ACCEPT_CASE_INSENSITIVE_PROPERTIES, true); TestData data = fetcher.fetch(endpoint, objectMapper, TestData.class); - assertEquals(data.name, "test"); - assertEquals(data.size, 10); + assertEquals("test", data.name); + assertEquals(10, data.size); } @Test diff --git a/src/test/java/software/amazon/cloudwatchlogs/emf/logger/MetricsLoggerTest.java b/src/test/java/software/amazon/cloudwatchlogs/emf/logger/MetricsLoggerTest.java index 332fde84..5756dab9 100644 --- a/src/test/java/software/amazon/cloudwatchlogs/emf/logger/MetricsLoggerTest.java +++ b/src/test/java/software/amazon/cloudwatchlogs/emf/logger/MetricsLoggerTest.java @@ -58,7 +58,7 @@ public void testPutProperty() { logger.putProperty(propertyName, propertyValue); logger.flush(); - Assert.assertEquals(sink.getContext().getProperty(propertyName), propertyValue); + Assert.assertEquals(propertyValue, sink.getContext().getProperty(propertyName)); } @Test @@ -68,10 +68,10 @@ public void testPutDimension() { logger.putDimensions(DimensionSet.of(dimensionName, dimensionValue)); logger.flush(); - Assert.assertEquals(sink.getContext().getDimensions().size(), 1); + Assert.assertEquals(1, sink.getContext().getDimensions().size()); Assert.assertEquals( - sink.getContext().getDimensions().get(0).getDimensionValue(dimensionName), - dimensionValue); + dimensionValue, + sink.getContext().getDimensions().get(0).getDimensionValue(dimensionName)); } @Test @@ -88,9 +88,9 @@ public void testOverrideDefaultDimensions() { logger.setDimensions(DimensionSet.of(dimensionName, dimensionValue)); logger.flush(); - Assert.assertEquals(sink.getContext().getDimensions().size(), 1); - Assert.assertEquals( - sink.getContext().getDimensions().get(0).getDimensionValue(defaultDimName), null); + Assert.assertEquals(1, sink.getContext().getDimensions().size()); + Assert.assertNull( + sink.getContext().getDimensions().get(0).getDimensionValue(defaultDimName)); } @Test @@ -134,11 +134,11 @@ public void testOverridePreviousDimensions() { logger.setDimensions(DimensionSet.of(dimensionName, dimensionValue)); logger.flush(); - Assert.assertEquals(sink.getContext().getDimensions().size(), 1); - Assert.assertEquals(sink.getContext().getDimensions().get(0).getDimensionKeys().size(), 1); + Assert.assertEquals(1, sink.getContext().getDimensions().size()); + Assert.assertEquals(1, sink.getContext().getDimensions().get(0).getDimensionKeys().size()); Assert.assertEquals( - sink.getContext().getDimensions().get(0).getDimensionValue(dimensionName), - dimensionValue); + dimensionValue, + sink.getContext().getDimensions().get(0).getDimensionValue(dimensionName)); } @Test @@ -163,7 +163,7 @@ public void testSetNamespace() { logger.setNamespace(namespace); logger.flush(); - Assert.assertEquals(sink.getContext().getNamespace(), namespace); + Assert.assertEquals(namespace, sink.getContext().getNamespace()); } @Test @@ -178,7 +178,7 @@ public void testSetTimestamp() { logger.setTimestamp(now); logger.flush(); - Assert.assertEquals(sink.getContext().getTimestamp(), now); + Assert.assertEquals(now, sink.getContext().getTimestamp()); } @Test @@ -249,8 +249,8 @@ public void testNoDefaultDimensions() { logger.flush(); List dimensions = sink.getContext().getDimensions(); - assertTrue(dimensions.size() == 0); - assertTrue(sink.getLogEvents().size() == 1); + assertEquals(0, dimensions.size()); + assertEquals(1, sink.getLogEvents().size()); String logEvent = sink.getLogEvents().get(0); assertTrue(logEvent.contains("\"Dimensions\":[]")); @@ -328,7 +328,7 @@ public void testNoDimensionsAfterSetEmptyDimensionSetWithMultipleFlush() { private void expectDimension(String dimension, String value) { List dimensions = sink.getContext().getDimensions(); - assertEquals(dimensions.size(), 1); - assertEquals(dimensions.get(0).getDimensionValue(dimension), value); + assertEquals(1, dimensions.size()); + assertEquals(value, dimensions.get(0).getDimensionValue(dimension)); } } diff --git a/src/test/java/software/amazon/cloudwatchlogs/emf/logger/MetricsLoggerThreadSafetyTest.java b/src/test/java/software/amazon/cloudwatchlogs/emf/logger/MetricsLoggerThreadSafetyTest.java new file mode 100644 index 00000000..f5afc128 --- /dev/null +++ b/src/test/java/software/amazon/cloudwatchlogs/emf/logger/MetricsLoggerThreadSafetyTest.java @@ -0,0 +1,472 @@ +package software.amazon.cloudwatchlogs.emf.logger; + +import static org.junit.Assert.assertEquals; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import com.fasterxml.jackson.annotation.JsonIgnore; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.annotation.JsonDeserialize; +import com.fasterxml.jackson.databind.annotation.JsonSerialize; +import com.fasterxml.jackson.databind.json.JsonMapper; +import java.util.*; +import java.util.concurrent.CompletableFuture; +import lombok.AllArgsConstructor; +import lombok.Getter; +import lombok.NonNull; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import software.amazon.cloudwatchlogs.emf.environment.Environment; +import software.amazon.cloudwatchlogs.emf.environment.EnvironmentProvider; +import software.amazon.cloudwatchlogs.emf.model.DimensionSet; +import software.amazon.cloudwatchlogs.emf.model.MetricsContext; +import software.amazon.cloudwatchlogs.emf.model.Unit; +import software.amazon.cloudwatchlogs.emf.serializers.UnitDeserializer; +import software.amazon.cloudwatchlogs.emf.serializers.UnitSerializer; +import software.amazon.cloudwatchlogs.emf.sinks.GroupedSinkShunt; +import software.amazon.cloudwatchlogs.emf.sinks.SinkShunt; + +public class MetricsLoggerThreadSafetyTest { + private volatile MetricsLogger logger; + private EnvironmentProvider envProvider; + private SinkShunt sink; + private Environment environment; + private volatile Throwable throwable = null; + + @Before + public void setUp() { + envProvider = mock(EnvironmentProvider.class); + environment = mock(Environment.class); + sink = new SinkShunt(); + + when(envProvider.resolveEnvironment()) + .thenReturn(CompletableFuture.completedFuture(environment)); + when(environment.getSink()).thenReturn(sink); + logger = new MetricsLogger(envProvider); + } + + @Test + public void testConcurrentPutProperty() throws InterruptedException { + final int N_THREAD = 100; + final int N_PUT_PROPERTY = 1000; + + logger = new MetricsLogger(envProvider); + Thread[] threads = new Thread[N_THREAD]; + long targetTimestampToRun = System.currentTimeMillis() + 500; + + for (int i = 0; i < N_THREAD; i++) { + final int id = i; + threads[i] = + new Thread( + () -> { + try { + Thread.sleep(targetTimestampToRun - System.currentTimeMillis()); + for (int j = 0; j < N_PUT_PROPERTY; j++) { + int propertyId = N_PUT_PROPERTY * id + j; + logger.putProperty( + "Property-" + propertyId, + String.valueOf(propertyId)); + } + } catch (Throwable e) { + throwable = e; // ensure no exceptions are thrown + } + }); + threads[i].start(); + } + + for (Thread t : threads) { + t.join(); + } + + logger.flush(); + for (int i = 0; i < N_THREAD * N_PUT_PROPERTY; i++) { + Assert.assertEquals(sink.getContext().getProperty("Property-" + i), String.valueOf(i)); + } + } + + @Test + public void testConcurrentPutDimension() throws InterruptedException { + final int N_THREAD = 100; + final int N_PUT_DIMENSIONS = 100; + + logger = new MetricsLogger(envProvider); + // disable default dimensions + logger.resetDimensions(false); + + Thread[] threads = new Thread[N_THREAD]; + long targetTimestampToRun = System.currentTimeMillis() + 500; + + for (int i = 0; i < N_THREAD; i++) { + final int id = i; + threads[i] = + new Thread( + () -> { + try { + Thread.sleep(targetTimestampToRun - System.currentTimeMillis()); + for (int j = 0; j < N_PUT_DIMENSIONS; j++) { + int dimensionId = N_PUT_DIMENSIONS * id + j; + logger.putDimensions( + DimensionSet.of( + String.valueOf(dimensionId), + String.valueOf(dimensionId))); + } + } catch (Throwable e) { + throwable = e; + } + }); + threads[i].start(); + } + + for (Thread t : threads) { + t.join(); + } + + logger.flush(); + + List dimensions = sink.getContext().getDimensions(); + // check size + Assert.assertEquals(sink.getContext().getDimensions().size(), N_THREAD * N_PUT_DIMENSIONS); + for (DimensionSet dim : dimensions) { + Assert.assertEquals( + dim.getDimensionKeys().size(), 1); // default dimensions are disabled + } + // check content + Collections.sort( + dimensions, + Comparator.comparingInt( + dim -> Integer.parseInt(dim.getDimensionKeys().iterator().next()))); + for (int i = 0; i < N_THREAD * N_PUT_DIMENSIONS; i++) { + Assert.assertEquals( + dimensions.get(i).getDimensionValue(String.valueOf(i)), String.valueOf(i)); + } + } + + @Test + public void testConcurrentPutDimensionAfterSetDimension() throws InterruptedException { + final int N_THREAD = 100; + final int N_PUT_DIMENSIONS = 100; + + logger = new MetricsLogger(envProvider); + logger.setDimensions(DimensionSet.of("0", "0")); + long targetTimestampToRun = System.currentTimeMillis() + 500; + + Thread[] threads = new Thread[N_THREAD]; + for (int i = 0; i < N_THREAD; i++) { + final int id = i; + threads[i] = + new Thread( + () -> { + try { + Thread.sleep(targetTimestampToRun - System.currentTimeMillis()); + for (int j = 0; j < N_PUT_DIMENSIONS; j++) { + int dimensionId = N_PUT_DIMENSIONS * id + j + 1; + logger.putDimensions( + DimensionSet.of( + String.valueOf(dimensionId), + String.valueOf(dimensionId))); + } + } catch (Throwable e) { + throwable = e; + } + }); + threads[i].start(); + } + + for (Thread t : threads) { + t.join(); + } + + logger.flush(); + + List dimensions = sink.getContext().getDimensions(); + // check size + Assert.assertEquals( + sink.getContext().getDimensions().size(), N_THREAD * N_PUT_DIMENSIONS + 1); + for (DimensionSet dim : dimensions) { + Assert.assertEquals( + dim.getDimensionKeys().size(), 1); // there are no default dimensions after set + } + // check content + Collections.sort( + dimensions, + Comparator.comparingInt( + dim -> Integer.parseInt(dim.getDimensionKeys().iterator().next()))); + for (int i = 0; i < N_THREAD * N_PUT_DIMENSIONS + 1; i++) { + Assert.assertEquals( + dimensions.get(i).getDimensionValue(String.valueOf(i)), String.valueOf(i)); + } + } + + @Test + public void testConcurrentFlush() throws InterruptedException, JsonProcessingException { + final int N_THREAD = 300; + + GroupedSinkShunt groupedSink = new GroupedSinkShunt(); + when(envProvider.resolveEnvironment()) + .thenReturn(CompletableFuture.completedFuture(environment)); + when(environment.getSink()).thenReturn(groupedSink); + + logger = new MetricsLogger(envProvider); + Thread[] threads = new Thread[N_THREAD]; + long targetTimestampToRun = System.currentTimeMillis() + 1000; + + for (int i = 0; i < N_THREAD; i++) { + final int id = i; + threads[i] = + new Thread( + () -> { + try { + // try to putMetric() and flush() at the same time + Thread.sleep(targetTimestampToRun - System.currentTimeMillis()); + logger.putMetric("Metric-" + id, id); + logger.flush(); + } catch (Throwable e) { + throwable = e; + } + }); + threads[i].start(); + } + + for (Thread t : threads) { + t.join(); + } + + ArrayList allMetrics = new ArrayList<>(); + for (List events : groupedSink.getLogEventList()) { + ArrayList metrics = parseAllMetrics(events); + allMetrics.addAll(metrics); + } + + assertEquals(allMetrics.size(), N_THREAD); + for (MetricDefinitionCopy metric : allMetrics) { + assertEquals(metric.getValues().size(), 1); + } + Collections.sort(allMetrics, Comparator.comparingDouble(m -> m.getValues().get(0))); + for (int i = 0; i < N_THREAD; i++) { + assertEquals(allMetrics.get(i).getName(), "Metric-" + i); + assertEquals(allMetrics.get(i).getValues().get(0), i, 1e-5); + } + } + + @Test + public void testConcurrentFlushAndPutMetric() + throws InterruptedException, JsonProcessingException { + final int N_THREAD = 500; + final int N_PUT_METRIC = 1000; + + GroupedSinkShunt groupedSink = new GroupedSinkShunt(); + when(envProvider.resolveEnvironment()) + .thenReturn(CompletableFuture.completedFuture(environment)); + when(environment.getSink()).thenReturn(groupedSink); + + logger = new MetricsLogger(envProvider); + Random rand = new Random(); + + Thread[] threads = new Thread[N_THREAD]; + for (int i = 0; i < N_THREAD; i++) { + final int id = i; + int randTime = rand.nextInt(1000); + threads[i] = + new Thread( + () -> { + try { + // half threads do putMetric(), half do flush() + // sleep to introduce more chaos in thread ordering + Thread.sleep(randTime); + if (id % 2 == 0) { + for (int j = id * N_PUT_METRIC / 2; + j < id * N_PUT_METRIC / 2 + N_PUT_METRIC; + j++) { + logger.putMetric("Metric-" + j, j); + } + } else { + logger.flush(); + } + } catch (Throwable e) { + throwable = e; + } + }); + threads[i].start(); + } + + for (Thread t : threads) { + t.join(); + } + logger.flush(); + + ArrayList allMetrics = new ArrayList<>(); + for (List events : groupedSink.getLogEventList()) { + ArrayList metrics = parseAllMetrics(events); + allMetrics.addAll(metrics); + } + + assertEquals(allMetrics.size(), N_THREAD * N_PUT_METRIC / 2); + for (MetricDefinitionCopy metric : allMetrics) { + assertEquals(metric.getValues().size(), 1); + } + Collections.sort(allMetrics, Comparator.comparingDouble(m -> m.getValues().get(0))); + for (int i = 0; i < N_THREAD * N_PUT_METRIC / 2; i++) { + assertEquals(allMetrics.get(i).getName(), "Metric-" + i); + assertEquals(allMetrics.get(i).getValues().get(0), i, 1e-5); + } + } + + @Test + public void testConcurrentFlushAndMethodsOtherThanPutMetric() throws InterruptedException { + final int N_THREAD = 600; + final int N_PUT_DIMENSIONS = 100; + final int N_PUT_PROPERTY = 100; + + GroupedSinkShunt groupedSink = new GroupedSinkShunt(); + when(envProvider.resolveEnvironment()) + .thenReturn(CompletableFuture.completedFuture(environment)); + when(environment.getSink()).thenReturn(groupedSink); + + logger = new MetricsLogger(envProvider); + logger.resetDimensions(false); + Random rand = new Random(); + + Thread[] threads = new Thread[N_THREAD]; + for (int i = 0; i < N_THREAD; i++) { + final int id = i; + int randTime = rand.nextInt(1000); + threads[i] = + new Thread( + () -> { + try { + Thread.sleep(randTime); + if (id < N_THREAD / 3) { + for (int j = id * N_PUT_DIMENSIONS; + j < id * N_PUT_DIMENSIONS + N_PUT_DIMENSIONS; + j++) { + logger.putDimensions( + DimensionSet.of( + String.valueOf(j), String.valueOf(j))); + } + } else if (id < N_THREAD / 3 * 2) { + for (int k = id * N_PUT_PROPERTY; + k < id * N_PUT_PROPERTY + N_PUT_PROPERTY; + k++) { + logger.putProperty("Property-" + k, k); + } + } else { + logger.flush(); + } + } catch (Throwable e) { + throwable = e; + } + }); + threads[i].start(); + } + + for (Thread t : threads) { + t.join(); + } + logger.flush(); + + int contextNum = groupedSink.getContexts().size(); + MetricsContext finalContext = groupedSink.getContexts().get(contextNum - 1); + List dimensions = finalContext.getDimensions(); + + // check dimension size + assertEquals(dimensions.size(), N_THREAD * N_PUT_DIMENSIONS / 3); + for (DimensionSet dim : dimensions) { + Assert.assertEquals(dim.getDimensionKeys().size(), 1); // there are 3 default dimensions + } + // check dimension content + Collections.sort( + dimensions, + Comparator.comparingInt( + dim -> Integer.parseInt(dim.getDimensionKeys().iterator().next()))); + for (int i = 0; i < N_THREAD * N_PUT_DIMENSIONS / 3; i++) { + Assert.assertEquals( + dimensions.get(i).getDimensionValue(String.valueOf(i)), String.valueOf(i)); + } + + // check property + int propertyCnt = 0; + for (MetricsContext mc : groupedSink.getContexts()) { + for (int i = N_THREAD * N_PUT_PROPERTY / 3; + i < N_THREAD * N_PUT_PROPERTY / 3 * 2; + i++) { + propertyCnt += mc.getProperty("Property-" + i) == null ? 0 : 1; + } + } + assertEquals(propertyCnt, N_THREAD * N_PUT_PROPERTY / 3); + } + + @After + public void tearDown() throws Throwable { + if (throwable != null) throw throwable; + throwable = null; // reset throwable to prevent repeat throwing + } + + private Map parseRootNode(String event) throws JsonProcessingException { + return new JsonMapper().readValue(event, new TypeReference>() {}); + } + + @SuppressWarnings("unchecked") + // can parse all metrics even if metric number exceeds MAX_METRICS_PER_EVENT + private ArrayList parseAllMetrics(List events) + throws JsonProcessingException { + ArrayList metricDefinitions = new ArrayList<>(); + for (String event : events) { + Map rootNode = parseRootNode(event); + Map metadata = (Map) rootNode.get("_aws"); + + if (metadata == null) { + continue; + } + + ArrayList> metricDirectives = + (ArrayList>) metadata.get("CloudWatchMetrics"); + ArrayList> metrics = + (ArrayList>) metricDirectives.get(0).get("Metrics"); + + for (Map metric : metrics) { + String name = metric.get("Name"); + Unit unit = Unit.fromValue(metric.get("Unit")); + Object value = rootNode.get(name); + if (value instanceof ArrayList) { + metricDefinitions.add(new MetricDefinitionCopy(name, unit, (ArrayList) value)); + } else { + metricDefinitions.add(new MetricDefinitionCopy(name, unit, (double) value)); + } + } + } + + return metricDefinitions; + } + + @AllArgsConstructor + private static class MetricDefinitionCopy { + @NonNull + @Getter + @JsonProperty("Name") + private String name; + + @Getter + @JsonProperty("Unit") + @JsonSerialize(using = UnitSerializer.class) + @JsonDeserialize(using = UnitDeserializer.class) + private Unit unit; + + @JsonIgnore @NonNull @Getter private List values; + + MetricDefinitionCopy(String name) { + this(name, Unit.NONE, new ArrayList<>()); + } + + MetricDefinitionCopy(String name, double value) { + this(name, Unit.NONE, value); + } + + MetricDefinitionCopy(String name, Unit unit, double value) { + this(name, unit, new ArrayList<>(Arrays.asList(value))); + } + } +} diff --git a/src/test/java/software/amazon/cloudwatchlogs/emf/model/MetadataTest.java b/src/test/java/software/amazon/cloudwatchlogs/emf/model/MetadataTest.java index 3d1c3c1e..a45bdb7f 100644 --- a/src/test/java/software/amazon/cloudwatchlogs/emf/model/MetadataTest.java +++ b/src/test/java/software/amazon/cloudwatchlogs/emf/model/MetadataTest.java @@ -39,9 +39,9 @@ public void testSerializeMetadata() throws JsonProcessingException { Map metadata_map = objectMapper.readValue(output, new TypeReference>() {}); - assertEquals(metadata_map.keySet().size(), 2); - assertEquals(metadata_map.get("Timestamp"), now.toEpochMilli()); - assertEquals(metadata_map.get("CloudWatchMetrics"), new ArrayList()); + assertEquals(2, metadata_map.keySet().size()); + assertEquals(now.toEpochMilli(), metadata_map.get("Timestamp")); + assertEquals(new ArrayList<>(), metadata_map.get("CloudWatchMetrics")); } @Test @@ -59,9 +59,9 @@ public void testSerializeMetadataWithCustomValue() throws JsonProcessingExceptio Map metadata_map = objectMapper.readValue(output, new TypeReference>() {}); - assertEquals(metadata_map.keySet().size(), 3); - assertEquals(metadata_map.get("Timestamp"), now.toEpochMilli()); - assertEquals(metadata_map.get("CloudWatchMetrics"), new ArrayList()); - assertEquals(metadata_map.get(property), expectedValue); + assertEquals(3, metadata_map.keySet().size()); + assertEquals(now.toEpochMilli(), metadata_map.get("Timestamp")); + assertEquals(new ArrayList<>(), metadata_map.get("CloudWatchMetrics")); + assertEquals(expectedValue, metadata_map.get(property)); } } diff --git a/src/test/java/software/amazon/cloudwatchlogs/emf/model/MetricDefinitionTest.java b/src/test/java/software/amazon/cloudwatchlogs/emf/model/MetricDefinitionTest.java index 2524eab4..496262ae 100644 --- a/src/test/java/software/amazon/cloudwatchlogs/emf/model/MetricDefinitionTest.java +++ b/src/test/java/software/amazon/cloudwatchlogs/emf/model/MetricDefinitionTest.java @@ -20,7 +20,7 @@ import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; -import java.util.Arrays; +import java.util.List; import org.junit.Test; public class MetricDefinitionTest { @@ -36,7 +36,7 @@ public void testSerializeMetricDefinitionWithoutUnit() throws JsonProcessingExce MetricDefinition metricDefinition = new MetricDefinition("Time"); String metricString = objectMapper.writeValueAsString(metricDefinition); - assertEquals(metricString, "{\"Name\":\"Time\",\"Unit\":\"None\"}"); + assertEquals("{\"Name\":\"Time\",\"Unit\":\"None\"}", metricString); } @Test @@ -45,15 +45,15 @@ public void testSerializeMetricDefinition() throws JsonProcessingException { MetricDefinition metricDefinition = new MetricDefinition("Time", Unit.MILLISECONDS, 10); String metricString = objectMapper.writeValueAsString(metricDefinition); - assertEquals(metricString, "{\"Name\":\"Time\",\"Unit\":\"Milliseconds\"}"); + assertEquals("{\"Name\":\"Time\",\"Unit\":\"Milliseconds\"}", metricString); } @Test public void testAddValue() { MetricDefinition md = new MetricDefinition("Time", Unit.MICROSECONDS, 10); - assertEquals(Arrays.asList(10d), md.getValues()); + assertEquals(List.of(10d), md.getValues()); md.addValue(20); - assertEquals(Arrays.asList(10d, 20d), md.getValues()); + assertEquals(List.of(10d, 20d), md.getValues()); } } diff --git a/src/test/java/software/amazon/cloudwatchlogs/emf/model/MetricDirectiveTest.java b/src/test/java/software/amazon/cloudwatchlogs/emf/model/MetricDirectiveTest.java index 31ab8d94..8234f369 100644 --- a/src/test/java/software/amazon/cloudwatchlogs/emf/model/MetricDirectiveTest.java +++ b/src/test/java/software/amazon/cloudwatchlogs/emf/model/MetricDirectiveTest.java @@ -35,8 +35,8 @@ public void testDefaultNamespace() throws JsonProcessingException { String serializedMetricDirective = objectMapper.writeValueAsString(metricDirective); assertEquals( - serializedMetricDirective, - "{\"Dimensions\":[[]],\"Metrics\":[],\"Namespace\":\"aws-embedded-metrics\"}"); + "{\"Dimensions\":[[]],\"Metrics\":[],\"Namespace\":\"aws-embedded-metrics\"}", + serializedMetricDirective); } @Test @@ -47,8 +47,8 @@ public void testSetNamespace() throws JsonProcessingException { String serializedMetricDirective = objectMapper.writeValueAsString(metricDirective); assertEquals( - serializedMetricDirective, - "{\"Dimensions\":[[]],\"Metrics\":[],\"Namespace\":\"test-lambda-metrics\"}"); + "{\"Dimensions\":[[]],\"Metrics\":[],\"Namespace\":\"test-lambda-metrics\"}", + serializedMetricDirective); } @Test @@ -59,8 +59,8 @@ public void testPutMetric() throws JsonProcessingException { String serializedMetricDirective = objectMapper.writeValueAsString(metricDirective); assertEquals( - serializedMetricDirective, - "{\"Dimensions\":[[]],\"Metrics\":[{\"Name\":\"Time\",\"Unit\":\"None\"}],\"Namespace\":\"aws-embedded-metrics\"}"); + "{\"Dimensions\":[[]],\"Metrics\":[{\"Name\":\"Time\",\"Unit\":\"None\"}],\"Namespace\":\"aws-embedded-metrics\"}", + serializedMetricDirective); } @Test @@ -71,21 +71,21 @@ public void testPutSameMetricMultipleTimes() { assertEquals(1, metricDirective.getAllMetrics().size()); MetricDefinition[] mds = metricDirective.getAllMetrics().toArray(new MetricDefinition[0]); - assertEquals(mds[0].getValues(), Arrays.asList(10d, 20d)); + assertEquals(Arrays.asList(10d, 20d), mds[0].getValues()); } @Test public void testPutMetricWithoutUnit() { MetricDirective metricDirective = new MetricDirective(); metricDirective.putMetric("Time", 10); - assertEquals(metricDirective.getMetrics().get("Time").getUnit(), Unit.NONE); + assertEquals(Unit.NONE, metricDirective.getMetrics().get("Time").getUnit()); } @Test public void testPutMetricWithUnit() { MetricDirective metricDirective = new MetricDirective(); metricDirective.putMetric("Time", 10, Unit.MILLISECONDS); - assertEquals(metricDirective.getMetrics().get("Time").getUnit(), Unit.MILLISECONDS); + assertEquals(Unit.MILLISECONDS, metricDirective.getMetrics().get("Time").getUnit()); } @Test @@ -97,8 +97,8 @@ public void testPutDimensions() throws JsonProcessingException { String serializedMetricDirective = objectMapper.writeValueAsString(metricDirective); assertEquals( - serializedMetricDirective, - "{\"Dimensions\":[[\"Region\",\"Instance\"]],\"Metrics\":[],\"Namespace\":\"aws-embedded-metrics\"}"); + "{\"Dimensions\":[[\"Region\",\"Instance\"]],\"Metrics\":[],\"Namespace\":\"aws-embedded-metrics\"}", + serializedMetricDirective); } @Test @@ -110,8 +110,8 @@ public void testPutDimensionSetWhenMultipleDimensionSets() throws JsonProcessing String serializedMetricDirective = objectMapper.writeValueAsString(metricDirective); assertEquals( - serializedMetricDirective, - "{\"Dimensions\":[[\"Region\"],[\"Instance\"]],\"Metrics\":[],\"Namespace\":\"aws-embedded-metrics\"}"); + "{\"Dimensions\":[[\"Region\"],[\"Instance\"]],\"Metrics\":[],\"Namespace\":\"aws-embedded-metrics\"}", + serializedMetricDirective); } @Test @@ -184,8 +184,8 @@ public void testPutDimensionsWhenDefaultDimensionsDefined() throws JsonProcessin String serializedMetricDirective = objectMapper.writeValueAsString(metricDirective); assertEquals( - serializedMetricDirective, - "{\"Dimensions\":[[\"Version\",\"Region\"],[\"Version\",\"Instance\"]],\"Metrics\":[],\"Namespace\":\"aws-embedded-metrics\"}"); + "{\"Dimensions\":[[\"Version\",\"Region\"],[\"Version\",\"Instance\"]],\"Metrics\":[],\"Namespace\":\"aws-embedded-metrics\"}", + serializedMetricDirective); } @Test @@ -198,7 +198,7 @@ public void testPutDimensionsAfterSetDimensions() throws JsonProcessingException String serializedMetricDirective = objectMapper.writeValueAsString(metricDirective); assertEquals( - serializedMetricDirective, - "{\"Dimensions\":[[\"Version\"],[\"Region\"],[\"Instance\"]],\"Metrics\":[],\"Namespace\":\"aws-embedded-metrics\"}"); + "{\"Dimensions\":[[\"Version\"],[\"Region\"],[\"Instance\"]],\"Metrics\":[],\"Namespace\":\"aws-embedded-metrics\"}", + serializedMetricDirective); } } diff --git a/src/test/java/software/amazon/cloudwatchlogs/emf/model/MetricDirectiveThreadSafetyTest.java b/src/test/java/software/amazon/cloudwatchlogs/emf/model/MetricDirectiveThreadSafetyTest.java new file mode 100644 index 00000000..7eb1f873 --- /dev/null +++ b/src/test/java/software/amazon/cloudwatchlogs/emf/model/MetricDirectiveThreadSafetyTest.java @@ -0,0 +1,101 @@ +package software.amazon.cloudwatchlogs.emf.model; + +import static org.junit.Assert.assertEquals; + +import java.util.Collections; +import org.junit.After; +import org.junit.Test; + +public class MetricDirectiveThreadSafetyTest { + private volatile Throwable throwable = null; + + @Test + public void testConcurrentPutMetricWithDifferentKey() throws InterruptedException { + final int N_THREAD = 100; + final int N_PUT_METRIC = 1000; + + MetricDirective metricDirective = new MetricDirective(); + Thread[] threads = new Thread[N_THREAD]; + long targetTimestampToRun = + System.currentTimeMillis() + + 500; // all threads should target running on this timestamp + + for (int i = 0; i < N_THREAD; i++) { + final int id = i; + threads[i] = + new Thread( + () -> { + try { + Thread.sleep( + targetTimestampToRun + - System.currentTimeMillis()); // try to make + // all threads + // run at same + // time + for (int j = 0; j < N_PUT_METRIC; j++) { + int metricId = N_PUT_METRIC * id + j; + metricDirective.putMetric("Metric-" + metricId, metricId); + } + } catch (Throwable e) { + throwable = e; + } + }); + threads[i].start(); + } + + for (Thread t : threads) { + t.join(); + } + + assertEquals(metricDirective.getAllMetrics().size(), N_THREAD * N_PUT_METRIC); + for (int i = 0; i < N_THREAD * N_PUT_METRIC; i++) { + assertEquals( + metricDirective.getMetrics().get("Metric-" + i).getValues().get(0), i, 1e-5); + } + } + + @Test + public void testConcurrentPutMetricWithSameKey() throws InterruptedException { + final int N_THREAD = 100; + final int N_PUT_METRIC = 1000; + + MetricDirective metricDirective = new MetricDirective(); + Thread[] threads = new Thread[N_THREAD]; + long targetTimestampToRun = System.currentTimeMillis() + 500; + + for (int i = 0; i < N_THREAD; i++) { + final int id = i; + threads[i] = + new Thread( + () -> { + try { + Thread.sleep(targetTimestampToRun - System.currentTimeMillis()); + for (int j = 0; j < N_PUT_METRIC; j++) { + int metricId = N_PUT_METRIC * id + j; + metricDirective.putMetric("Metric", metricId); + } + } catch (Throwable e) { + throwable = e; + } + }); + threads[i].start(); + } + + for (Thread t : threads) { + t.join(); + } + + assertEquals(metricDirective.getAllMetrics().size(), 1); + MetricDefinition md = metricDirective.getAllMetrics().toArray(new MetricDefinition[0])[0]; + Collections.sort(md.getValues()); + for (int i = 0; i < N_THREAD * N_PUT_METRIC; i++) { + assertEquals(md.getValues().get(i), i, 1e-5); + } + } + + @After + public void tearDown() throws Throwable { + if (throwable != null) throw throwable; + throwable = null; + } +} diff --git a/src/test/java/software/amazon/cloudwatchlogs/emf/model/MetricsContextTest.java b/src/test/java/software/amazon/cloudwatchlogs/emf/model/MetricsContextTest.java index 703a87c5..b1913c9a 100644 --- a/src/test/java/software/amazon/cloudwatchlogs/emf/model/MetricsContextTest.java +++ b/src/test/java/software/amazon/cloudwatchlogs/emf/model/MetricsContextTest.java @@ -25,7 +25,6 @@ import com.fasterxml.jackson.databind.json.JsonMapper; import java.time.Instant; import java.util.ArrayList; -import java.util.Arrays; import java.util.List; import java.util.Map; import org.junit.Test; @@ -100,7 +99,7 @@ public void testSerializeAMetricWith101DataPoints() throws JsonProcessingExcepti expectedValues.add((double) i); } assertEquals(expectedValues, allMetrics.get(0).getValues()); - assertTrue(allMetrics.get(1).getValues().equals(Arrays.asList(100.0))); + assertEquals(List.of(100.0), allMetrics.get(1).getValues()); } @Test @@ -126,10 +125,10 @@ public void testSerializeMetricsWith101DataPoints() throws JsonProcessingExcepti expectedValues.add((double) i); } assertEquals(expectedValues, metricsFromEvent1.get(0).getValues()); - assertEquals(Arrays.asList(2.0), metricsFromEvent1.get(1).getValues()); + assertEquals(List.of(2.0), metricsFromEvent1.get(1).getValues()); assertEquals(1, metricsFromEvent2.size()); - assertEquals(Arrays.asList(100.0), metricsFromEvent2.get(0).getValues()); + assertEquals(List.of(100.0), metricsFromEvent2.get(0).getValues()); } @Test @@ -165,7 +164,17 @@ public void testSetTimestamp() throws JsonProcessingException { Map metadata = (Map) rootNode.get("_aws"); assertTrue(metadata.containsKey("Timestamp")); - assertEquals(metadata.get("Timestamp"), now.toEpochMilli()); + assertEquals(now.toEpochMilli(), metadata.get("Timestamp")); + } + + @Test + public void testPutMetadata() { + MetricsContext mc = new MetricsContext(); + mc.putMetadata("Metadata", "MetadataValue"); + + Map customFields = mc.getRootNode().getAws().getCustomMetadata(); + assertEquals(customFields.size(), 1); + assertEquals(customFields.get("Metadata"), "MetadataValue"); } @SuppressWarnings("unchecked") diff --git a/src/test/java/software/amazon/cloudwatchlogs/emf/model/MetricsContextThreadSafetyTest.java b/src/test/java/software/amazon/cloudwatchlogs/emf/model/MetricsContextThreadSafetyTest.java new file mode 100644 index 00000000..6ea33883 --- /dev/null +++ b/src/test/java/software/amazon/cloudwatchlogs/emf/model/MetricsContextThreadSafetyTest.java @@ -0,0 +1,55 @@ +package software.amazon.cloudwatchlogs.emf.model; + +import static org.junit.Assert.assertEquals; + +import java.util.Map; +import org.junit.After; +import org.junit.Test; + +public class MetricsContextThreadSafetyTest { + private volatile Throwable throwable = null; + + @Test + public void testConcurrentPutMetaData() throws InterruptedException { + final int N_THREAD = 100; + final int N_PUT_METADATA = 1000; + + MetricsContext mc = new MetricsContext(); + Thread[] threads = new Thread[N_THREAD]; + long targetTimestampToRun = System.currentTimeMillis() + 500; + + for (int i = 0; i < N_THREAD; i++) { + final int id = i; + threads[i] = + new Thread( + () -> { + try { + Thread.sleep(targetTimestampToRun - System.currentTimeMillis()); + for (int j = 0; j < N_PUT_METADATA; j++) { + int metaDataId = N_PUT_METADATA * id + j; + mc.putMetadata("MetaData-" + metaDataId, metaDataId); + } + } catch (Throwable e) { + throwable = e; + } + }); + threads[i].start(); + } + + for (Thread t : threads) { + t.join(); + } + + Map metaData = mc.getRootNode().getAws().getCustomMetadata(); + assertEquals(metaData.size(), N_THREAD * N_PUT_METADATA); + for (int i = 0; i < N_THREAD * N_PUT_METADATA; i++) { + assertEquals(metaData.get("MetaData-" + i), i); + } + } + + @After + public void tearDown() throws Throwable { + if (throwable != null) throw throwable; + throwable = null; + } +} diff --git a/src/test/java/software/amazon/cloudwatchlogs/emf/model/RootNodeTest.java b/src/test/java/software/amazon/cloudwatchlogs/emf/model/RootNodeTest.java index 0e91f544..d72e8d71 100644 --- a/src/test/java/software/amazon/cloudwatchlogs/emf/model/RootNodeTest.java +++ b/src/test/java/software/amazon/cloudwatchlogs/emf/model/RootNodeTest.java @@ -22,7 +22,6 @@ import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; -import java.util.Arrays; import java.util.List; import java.util.Map; import org.junit.Test; @@ -34,7 +33,7 @@ public void testPutProperty() { RootNode rootNode = new RootNode(); rootNode.putProperty("Property", "Value"); - assertEquals(rootNode.getTargetMembers().get("Property"), "Value"); + assertEquals("Value", rootNode.getTargetMembers().get("Property")); } @Test @@ -43,7 +42,7 @@ public void testPutSamePropertyMultipleTimes() { rootNode.putProperty("Property", "Value"); rootNode.putProperty("Property", "NewValue"); - assertEquals(rootNode.getTargetMembers().get("Property"), "NewValue"); + assertEquals("NewValue", rootNode.getTargetMembers().get("Property")); } @Test @@ -52,7 +51,7 @@ public void testGetDimension() { MetricDirective metricDirective = rootNode.getAws().createMetricDirective(); metricDirective.putDimensionSet(DimensionSet.of("Dim1", "DimValue1")); - assertEquals(rootNode.getTargetMembers().get("Dim1"), "DimValue1"); + assertEquals("DimValue1", rootNode.getTargetMembers().get("Dim1")); } @Test @@ -70,10 +69,10 @@ public void testGetTargetMembers() { mc.putProperty("Prop1", "PropValue1"); - assertEquals(rootNode.getTargetMembers().get("Count"), Arrays.asList(10.0, 20.0)); - assertEquals(rootNode.getTargetMembers().get("Latency"), 100.0); - assertEquals(rootNode.getTargetMembers().get("Dim1"), "DimVal1"); - assertEquals(rootNode.getTargetMembers().get("Prop1"), "PropValue1"); + assertEquals(List.of(10.0, 20.0), rootNode.getTargetMembers().get("Count")); + assertEquals(100.0, rootNode.getTargetMembers().get("Latency")); + assertEquals("DimVal1", rootNode.getTargetMembers().get("Dim1")); + assertEquals("PropValue1", rootNode.getTargetMembers().get("Prop1")); } @SuppressWarnings("unchecked") @@ -92,11 +91,11 @@ public void testSerializeRootNode() throws JsonProcessingException { objectMapper.readValue( emf_logs.get(0), new TypeReference>() {}); - assertEquals(emf_map.keySet().size(), 5); - assertEquals(emf_map.get("Region"), "us-east-1"); - assertEquals(emf_map.get("Property"), "PropertyValue"); - assertEquals(emf_map.get("DefaultDim"), "DefaultDimValue"); - assertEquals(emf_map.get("Count"), 10.0); + assertEquals(5, emf_map.keySet().size()); + assertEquals("us-east-1", emf_map.get("Region")); + assertEquals("PropertyValue", emf_map.get("Property")); + assertEquals("DefaultDimValue", emf_map.get("DefaultDim")); + assertEquals(10.0, emf_map.get("Count")); Map metadata = (Map) emf_map.get("_aws"); assertTrue(metadata.containsKey("Timestamp")); @@ -110,6 +109,6 @@ public void testSerializeRootNodeWithoutAnyMetrics() throws JsonProcessingExcept String value = "bar"; root.putProperty(property, value); - assertEquals(root.serialize(), "{\"foo\":\"bar\"}"); + assertEquals("{\"foo\":\"bar\"}", root.serialize()); } } diff --git a/src/test/java/software/amazon/cloudwatchlogs/emf/sinks/AgentSinkTest.java b/src/test/java/software/amazon/cloudwatchlogs/emf/sinks/AgentSinkTest.java index 54efb24a..4970ca42 100644 --- a/src/test/java/software/amazon/cloudwatchlogs/emf/sinks/AgentSinkTest.java +++ b/src/test/java/software/amazon/cloudwatchlogs/emf/sinks/AgentSinkTest.java @@ -72,10 +72,10 @@ public void testAccept() throws JsonProcessingException { new TypeReference>() {}); Map metadata = (Map) emf_map.get("_aws"); - assertEquals(emf_map.get(prop), propValue); - assertEquals(emf_map.get("Time"), 10.0); - assertEquals(metadata.get("LogGroupName"), logGroupName); - assertEquals(metadata.get("LogStreamName"), logStreamName); + assertEquals(propValue, emf_map.get(prop)); + assertEquals(10.0, emf_map.get("Time")); + assertEquals(logGroupName, metadata.get("LogGroupName")); + assertEquals(logStreamName, metadata.get("LogStreamName")); } @Test diff --git a/src/test/java/software/amazon/cloudwatchlogs/emf/sinks/EndpointTest.java b/src/test/java/software/amazon/cloudwatchlogs/emf/sinks/EndpointTest.java index 62af645e..77f4de42 100644 --- a/src/test/java/software/amazon/cloudwatchlogs/emf/sinks/EndpointTest.java +++ b/src/test/java/software/amazon/cloudwatchlogs/emf/sinks/EndpointTest.java @@ -44,7 +44,7 @@ public void testReturnDefaultEndpointForInvalidURI() { Endpoint endpoint = Endpoint.fromURL(unsupportedEndpoint); Endpoint endpointFromEmptyString = Endpoint.fromURL(""); - assertEquals(endpoint, Endpoint.DEFAULT_TCP_ENDPOINT); - assertEquals(endpointFromEmptyString, Endpoint.DEFAULT_TCP_ENDPOINT); + assertEquals(Endpoint.DEFAULT_TCP_ENDPOINT, endpoint); + assertEquals(Endpoint.DEFAULT_TCP_ENDPOINT, endpointFromEmptyString); } } diff --git a/src/test/java/software/amazon/cloudwatchlogs/emf/sinks/GroupedSinkShunt.java b/src/test/java/software/amazon/cloudwatchlogs/emf/sinks/GroupedSinkShunt.java new file mode 100644 index 00000000..d39776e3 --- /dev/null +++ b/src/test/java/software/amazon/cloudwatchlogs/emf/sinks/GroupedSinkShunt.java @@ -0,0 +1,57 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package software.amazon.cloudwatchlogs.emf.sinks; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import software.amazon.cloudwatchlogs.emf.model.MetricsContext; + +/** + * A mocked sink which can preserve all flushed log events. Useful for testing the result of + * concurrent flushing. + */ +public class GroupedSinkShunt implements ISink { + + private List contexts = new ArrayList<>(); + + private List> logEventList = new ArrayList<>(); + + @Override + public void accept(MetricsContext context) { + this.contexts.add(context); + try { + List logEvent = context.serialize(); + logEventList.add(logEvent); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + @Override + public CompletableFuture shutdown() { + return CompletableFuture.completedFuture(null); + } + + public List getContexts() { + return contexts; + } + + public List> getLogEventList() { + return this.logEventList; + } +} diff --git a/src/test/java/software/amazon/cloudwatchlogs/emf/sinks/ProtocolTest.java b/src/test/java/software/amazon/cloudwatchlogs/emf/sinks/ProtocolTest.java index d694c0e4..2e96a66e 100644 --- a/src/test/java/software/amazon/cloudwatchlogs/emf/sinks/ProtocolTest.java +++ b/src/test/java/software/amazon/cloudwatchlogs/emf/sinks/ProtocolTest.java @@ -24,16 +24,16 @@ public class ProtocolTest { @Test public void testParseTCP() { - assertEquals(Protocol.getProtocol("TCP"), Protocol.TCP); - assertEquals(Protocol.getProtocol("tcp"), Protocol.TCP); - assertEquals(Protocol.getProtocol("Tcp"), Protocol.TCP); + assertEquals(Protocol.TCP, Protocol.getProtocol("TCP")); + assertEquals(Protocol.TCP, Protocol.getProtocol("tcp")); + assertEquals(Protocol.TCP, Protocol.getProtocol("Tcp")); } @Test public void testParseUDP() { - assertEquals(Protocol.getProtocol("UDP"), Protocol.UDP); - assertEquals(Protocol.getProtocol("udp"), Protocol.UDP); - assertEquals(Protocol.getProtocol("Udp"), Protocol.UDP); + assertEquals(Protocol.UDP, Protocol.getProtocol("UDP")); + assertEquals(Protocol.UDP, Protocol.getProtocol("udp")); + assertEquals(Protocol.UDP, Protocol.getProtocol("Udp")); } @Test(expected = IllegalArgumentException.class) diff --git a/src/test/java/software/amazon/cloudwatchlogs/emf/util/IOUtilsTest.java b/src/test/java/software/amazon/cloudwatchlogs/emf/util/IOUtilsTest.java index 8b6170c0..662e9497 100644 --- a/src/test/java/software/amazon/cloudwatchlogs/emf/util/IOUtilsTest.java +++ b/src/test/java/software/amazon/cloudwatchlogs/emf/util/IOUtilsTest.java @@ -36,7 +36,7 @@ public void testToString() throws IOException { String str = faker.letterify("?????"); ByteArrayInputStream is = new ByteArrayInputStream(str.getBytes()); - assertEquals(IOUtils.toString(is), str); + assertEquals(str, IOUtils.toString(is)); } @Test @@ -44,7 +44,7 @@ public void testToByteArray() throws IOException { String str = faker.letterify("?????"); ByteArrayInputStream is = new ByteArrayInputStream(str.getBytes()); - assertArrayEquals(IOUtils.toByteArray(is), str.getBytes()); + assertArrayEquals(str.getBytes(), IOUtils.toByteArray(is)); } @Test