diff --git a/build.gradle b/build.gradle index 59941a077..3a60f1f05 100644 --- a/build.gradle +++ b/build.gradle @@ -343,7 +343,7 @@ subprojects { check.dependsOn("testModules") - if (!(project.name in ['micrometer-test-aspectj-ltw', 'micrometer-test-aspectj-ctw'])) { // add projects here that do not exist in the previous minor so should be excluded from japicmp + if (!(project.name in ['micrometer-test-aspectj-ltw', 'micrometer-test-aspectj-ctw', 'micrometer-java21'])) { // add projects here that do not exist in the previous minor so should be excluded from japicmp apply plugin: 'me.champeau.gradle.japicmp' apply plugin: 'de.undercouch.download' diff --git a/micrometer-java21/build.gradle b/micrometer-java21/build.gradle new file mode 100644 index 000000000..c8700350f --- /dev/null +++ b/micrometer-java21/build.gradle @@ -0,0 +1,40 @@ +description 'Micrometer core classes that require Java 21' + +// skip this module when building with jdk <21 +if (!javaLanguageVersion.canCompileOrRun(21)) { + project.tasks.configureEach { task -> task.enabled = false } +} + +dependencies { + api project(':micrometer-core') + + testImplementation 'org.junit.jupiter:junit-jupiter' + testImplementation 'org.assertj:assertj-core' + testImplementation 'org.awaitility:awaitility' +} + +java { + targetCompatibility = 21 +} + +tasks.withType(JavaCompile).configureEach { + sourceCompatibility = JavaVersion.VERSION_21 + targetCompatibility = JavaVersion.VERSION_21 + options.release = 21 +} + +task reflectiveTests(type: Test) { + useJUnitPlatform { + includeTags 'reflective' + } + + // This hack is needed since VirtualThreadMetricsReflectiveTests utilizes reflection against java.lang, see its javadoc + jvmArgs += ['--add-opens', 'java.base/java.lang=ALL-UNNAMED'] +} + +test { + dependsOn reflectiveTests + useJUnitPlatform { + excludeTags 'reflective' + } +} diff --git a/micrometer-java21/src/main/java/io/micrometer/java21/instrument/binder/jdk/VirtualThreadMetrics.java b/micrometer-java21/src/main/java/io/micrometer/java21/instrument/binder/jdk/VirtualThreadMetrics.java new file mode 100644 index 000000000..97eb5f558 --- /dev/null +++ b/micrometer-java21/src/main/java/io/micrometer/java21/instrument/binder/jdk/VirtualThreadMetrics.java @@ -0,0 +1,107 @@ +/* + * Copyright 2024 VMware, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.micrometer.java21.instrument.binder.jdk; + +import io.micrometer.core.instrument.Counter; +import io.micrometer.core.instrument.MeterRegistry; +import io.micrometer.core.instrument.Tag; +import io.micrometer.core.instrument.Timer; +import io.micrometer.core.instrument.binder.MeterBinder; +import jdk.jfr.consumer.RecordingStream; + +import java.io.Closeable; +import java.time.Duration; +import java.util.Objects; + +import static java.util.Collections.emptyList; + +/** + * Instrumentation support for Virtual Threads, see: + * https://openjdk.org/jeps/425#JDK-Flight-Recorder-JFR + * + * @author Artyom Gabeev + * @since 1.14.0 + */ +public class VirtualThreadMetrics implements MeterBinder, Closeable { + + private static final String PINNED_EVENT = "jdk.VirtualThreadPinned"; + + private static final String SUBMIT_FAILED_EVENT = "jdk.VirtualThreadSubmitFailed"; + + private final RecordingStream recordingStream; + + private final Iterable tags; + + public VirtualThreadMetrics() { + this(new RecordingConfig(), emptyList()); + } + + public VirtualThreadMetrics(Iterable tags) { + this(new RecordingConfig(), tags); + } + + private VirtualThreadMetrics(RecordingConfig config, Iterable tags) { + this.recordingStream = createRecordingStream(config); + this.tags = tags; + } + + @Override + public void bindTo(MeterRegistry registry) { + Timer pinnedTimer = Timer.builder("jvm.threads.virtual.pinned") + .description("The duration while the virtual thread was pinned without releasing its platform thread") + .tags(tags) + .register(registry); + + Counter submitFailedCounter = Counter.builder("jvm.threads.virtual.submit.failed") + .description("The number of events when starting or unparking a virtual thread failed") + .tags(tags) + .register(registry); + + recordingStream.onEvent(PINNED_EVENT, event -> pinnedTimer.record(event.getDuration())); + recordingStream.onEvent(SUBMIT_FAILED_EVENT, event -> submitFailedCounter.increment()); + } + + private RecordingStream createRecordingStream(RecordingConfig config) { + RecordingStream recordingStream = new RecordingStream(); + recordingStream.enable(PINNED_EVENT).withThreshold(config.pinnedThreshold); + recordingStream.enable(SUBMIT_FAILED_EVENT); + recordingStream.setMaxAge(config.maxAge); + recordingStream.setMaxSize(config.maxSizeBytes); + recordingStream.startAsync(); + + return recordingStream; + } + + @Override + public void close() { + recordingStream.close(); + } + + private record RecordingConfig(Duration maxAge, long maxSizeBytes, Duration pinnedThreshold) { + private RecordingConfig() { + this(Duration.ofSeconds(5), 10L * 1024 * 1024, Duration.ofMillis(20)); + } + + private RecordingConfig { + Objects.requireNonNull(maxAge, "maxAge parameter must not be null"); + Objects.requireNonNull(pinnedThreshold, "pinnedThreshold must not be null"); + if (maxSizeBytes < 0) { + throw new IllegalArgumentException("maxSizeBytes must be positive"); + } + } + } + +} diff --git a/micrometer-java21/src/main/java/io/micrometer/java21/instrument/binder/jdk/package-info.java b/micrometer-java21/src/main/java/io/micrometer/java21/instrument/binder/jdk/package-info.java new file mode 100644 index 000000000..754ded751 --- /dev/null +++ b/micrometer-java21/src/main/java/io/micrometer/java21/instrument/binder/jdk/package-info.java @@ -0,0 +1,25 @@ +/* + * Copyright 2024 VMware, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Instrumentation of JDK classes. + */ +@NonNullApi +@NonNullFields +package io.micrometer.java21.instrument.binder.jdk; + +import io.micrometer.common.lang.NonNullApi; +import io.micrometer.common.lang.NonNullFields; diff --git a/micrometer-java21/src/test/java/io/micrometer/java21/instrument/binder/jdk/VirtualThreadMetricsReflectiveTests.java b/micrometer-java21/src/test/java/io/micrometer/java21/instrument/binder/jdk/VirtualThreadMetricsReflectiveTests.java new file mode 100644 index 000000000..e1613d86f --- /dev/null +++ b/micrometer-java21/src/test/java/io/micrometer/java21/instrument/binder/jdk/VirtualThreadMetricsReflectiveTests.java @@ -0,0 +1,118 @@ +/* + * Copyright 2024 VMware, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.micrometer.java21.instrument.binder.jdk; + +import io.micrometer.core.instrument.Counter; +import io.micrometer.core.instrument.Tags; +import io.micrometer.core.instrument.simple.SimpleMeterRegistry; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.Test; + +import java.lang.reflect.Constructor; +import java.time.Duration; +import java.util.concurrent.*; +import java.util.concurrent.locks.LockSupport; + +import static java.lang.Thread.State.WAITING; +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.awaitility.Awaitility.await; + +/** + * Tests for {@link VirtualThreadMetrics}. If you run these tests from your IDE, + * {@link #submitFailedEventsShouldBeRecorded()} might fail depending on your setup. This + * is because the test (through {@link #virtualThreadFactoryFor(Executor)}) utilizes + * reflection against the {@code java.lang} package which needs to be explicitly enabled. + * If you run into such an issue you can either change your setup and let your IDE run the + * tests utilizing the build system (Gradle) or add the following JVM arg to your test + * config: {@code --add-opens java.base/java.lang=ALL-UNNAMED} + * + * @author Artyom Gabeev + * @author Jonatan Ivanov + */ +@Tag("reflective") +class VirtualThreadMetricsReflectiveTests { + + private static final Tags TAGS = Tags.of("k", "v"); + + private SimpleMeterRegistry registry; + + private VirtualThreadMetrics virtualThreadMetrics; + + @BeforeEach + void setUp() { + registry = new SimpleMeterRegistry(); + virtualThreadMetrics = new VirtualThreadMetrics(TAGS); + virtualThreadMetrics.bindTo(registry); + } + + @AfterEach + void tearDown() { + virtualThreadMetrics.close(); + } + + /** + * Uses a similar approach as the JDK tests to make starting or unparking a virtual + * thread fail, see {@link #virtualThreadFactoryFor(Executor)} and JfrEvents.java + */ + @Test + void submitFailedEventsShouldBeRecorded() { + try (ExecutorService cachedPool = Executors.newCachedThreadPool()) { + ThreadFactory factory = virtualThreadFactoryFor(cachedPool); + Thread thread = factory.newThread(LockSupport::park); + thread.start(); + + await().atMost(Duration.ofSeconds(2)).until(() -> thread.getState() == WAITING); + cachedPool.shutdown(); + + // unpark, the pool was shut down, this should fail + assertThatThrownBy(() -> LockSupport.unpark(thread)).isInstanceOf(RejectedExecutionException.class); + + Counter counter = registry.get("jvm.threads.virtual.submit.failed").tags(TAGS).counter(); + await().atMost(Duration.ofSeconds(2)).until(() -> counter.count() == 1); + + // park, the pool was shut down, this should fail + assertThatThrownBy(() -> factory.newThread(LockSupport::park).start()) + .isInstanceOf(RejectedExecutionException.class); + await().atMost(Duration.ofSeconds(2)).until(() -> counter.count() == 2); + } + } + + /** + * Creates a {@link ThreadFactory} for virtual threads. The created virtual threads + * will be bound to the provided platform thread pool instead of a default + * ForkJoinPool. At its current form, this is a hack, it utilizes reflection to supply + * the platform thread pool. It seems though there is no other way of doing this, the + * JDK tests are also utilizing reflection to do the same, see: VThreadScheduler.java + * @param pool platform pool + * @return virtual thread factory bound to the provided platform pool + */ + private static ThreadFactory virtualThreadFactoryFor(Executor pool) { + try { + Class clazz = Class.forName("java.lang.ThreadBuilders$VirtualThreadBuilder"); + Constructor constructor = clazz.getDeclaredConstructor(Executor.class); + constructor.setAccessible(true); + return ((Thread.Builder.OfVirtual) constructor.newInstance(pool)).factory(); + } + catch (Exception e) { + throw new RuntimeException(e); + } + } + +} diff --git a/micrometer-java21/src/test/java/io/micrometer/java21/instrument/binder/jdk/VirtualThreadMetricsTests.java b/micrometer-java21/src/test/java/io/micrometer/java21/instrument/binder/jdk/VirtualThreadMetricsTests.java new file mode 100644 index 000000000..1cc0b0e58 --- /dev/null +++ b/micrometer-java21/src/test/java/io/micrometer/java21/instrument/binder/jdk/VirtualThreadMetricsTests.java @@ -0,0 +1,115 @@ +/* + * Copyright 2024 VMware, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.micrometer.java21.instrument.binder.jdk; + +import io.micrometer.core.instrument.Tags; +import io.micrometer.core.instrument.Timer; +import io.micrometer.core.instrument.simple.SimpleMeterRegistry; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.*; + +import static java.util.concurrent.TimeUnit.MILLISECONDS; +import static org.assertj.core.api.Assertions.assertThat; +import static org.awaitility.Awaitility.await; + +/** + * Tests for {@link VirtualThreadMetrics}. + * + * @author Artyom Gabeev + * @author Jonatan Ivanov + */ +class VirtualThreadMetricsTests { + + private static final Tags TAGS = Tags.of("k", "v"); + + private SimpleMeterRegistry registry; + + private VirtualThreadMetrics virtualThreadMetrics; + + @BeforeEach + void setUp() { + registry = new SimpleMeterRegistry(); + virtualThreadMetrics = new VirtualThreadMetrics(TAGS); + virtualThreadMetrics.bindTo(registry); + } + + @AfterEach + void tearDown() { + virtualThreadMetrics.close(); + } + + @Test + void pinnedEventsShouldBeRecorded() { + try (ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor()) { + CountDownLatch latch = new CountDownLatch(1); + List> futures = new ArrayList<>(); + for (int i = 0; i < 3; i++) { + futures.add(executor.submit(() -> pinCurrentThreadAndAwait(latch))); + } + sleep(Duration.ofMillis(50)); // the time the threads will be pinned for + latch.countDown(); + for (Future future : futures) { + waitFor(future); + } + + Timer timer = registry.get("jvm.threads.virtual.pinned").tags(TAGS).timer(); + await().atMost(Duration.ofSeconds(2)).until(() -> timer.count() == 3); + assertThat(timer.max(MILLISECONDS)).isBetween(40d, 60d); // ~50ms + assertThat(timer.totalTime(MILLISECONDS)).isBetween(130d, 170d); // ~150ms + } + } + + private void pinCurrentThreadAndAwait(CountDownLatch latch) { + synchronized (new Object()) { // assumes that synchronized pins the thread + try { + if (!latch.await(2, TimeUnit.SECONDS)) { + throw new IllegalStateException("Timed out waiting for latch"); + } + } + catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + } + + private void sleep(Duration duration) { + try { + Thread.sleep(duration); + } + catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + + private void waitFor(Future future) { + try { + future.get(); + } + catch (ExecutionException | InterruptedException e) { + throw new RuntimeException(e); + } + finally { + future.cancel(true); + } + } + +} diff --git a/settings.gradle b/settings.gradle index 1c5a028c0..621ccc626 100644 --- a/settings.gradle +++ b/settings.gradle @@ -47,6 +47,7 @@ include 'concurrency-tests' include 'micrometer-bom' include 'micrometer-jakarta9' include 'micrometer-java11' +include 'micrometer-java21' include 'micrometer-jetty11' include 'micrometer-jetty12' include 'micrometer-osgi-test'