Skip to content

Commit

Permalink
Virtual threads metrics (#5067)
Browse files Browse the repository at this point in the history
Creates a new module micrometer-java21 for code that needs to be baselined on Java 21, including the added virtual threads metrics. The instrumentation is JFR-based, although that is an implementation detail. There are some configuration options for the instrumentation but they are not exposed as public API until there is a clear need for that from users.

Fixes gh-3956

Co-authored-by: Jonatan Ivanov <jonatan.ivanov@gmail.com>
  • Loading branch information
ArtyomGabeev and jonatan-ivanov committed Sep 24, 2024
1 parent 4bbc13f commit 4b6b761
Show file tree
Hide file tree
Showing 7 changed files with 407 additions and 1 deletion.
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -343,7 +343,7 @@ subprojects {

check.dependsOn("testModules")

if (!(project.name in ['micrometer-test-aspectj-ltw', 'micrometer-test-aspectj-ctw'])) { // add projects here that do not exist in the previous minor so should be excluded from japicmp
if (!(project.name in ['micrometer-test-aspectj-ltw', 'micrometer-test-aspectj-ctw', 'micrometer-java21'])) { // add projects here that do not exist in the previous minor so should be excluded from japicmp
apply plugin: 'me.champeau.gradle.japicmp'
apply plugin: 'de.undercouch.download'

Expand Down
40 changes: 40 additions & 0 deletions micrometer-java21/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
description 'Micrometer core classes that require Java 21'

// skip this module when building with jdk <21
if (!javaLanguageVersion.canCompileOrRun(21)) {
project.tasks.configureEach { task -> task.enabled = false }
}

dependencies {
api project(':micrometer-core')

testImplementation 'org.junit.jupiter:junit-jupiter'
testImplementation 'org.assertj:assertj-core'
testImplementation 'org.awaitility:awaitility'
}

java {
targetCompatibility = 21
}

tasks.withType(JavaCompile).configureEach {
sourceCompatibility = JavaVersion.VERSION_21
targetCompatibility = JavaVersion.VERSION_21
options.release = 21
}

task reflectiveTests(type: Test) {
useJUnitPlatform {
includeTags 'reflective'
}

// This hack is needed since VirtualThreadMetricsReflectiveTests utilizes reflection against java.lang, see its javadoc
jvmArgs += ['--add-opens', 'java.base/java.lang=ALL-UNNAMED']
}

test {
dependsOn reflectiveTests
useJUnitPlatform {
excludeTags 'reflective'
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
/*
* Copyright 2024 VMware, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.micrometer.java21.instrument.binder.jdk;

import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.Tag;
import io.micrometer.core.instrument.Timer;
import io.micrometer.core.instrument.binder.MeterBinder;
import jdk.jfr.consumer.RecordingStream;

import java.io.Closeable;
import java.time.Duration;
import java.util.Objects;

import static java.util.Collections.emptyList;

/**
* Instrumentation support for Virtual Threads, see:
* https://openjdk.org/jeps/425#JDK-Flight-Recorder-JFR
*
* @author Artyom Gabeev
* @since 1.14.0
*/
public class VirtualThreadMetrics implements MeterBinder, Closeable {

private static final String PINNED_EVENT = "jdk.VirtualThreadPinned";

private static final String SUBMIT_FAILED_EVENT = "jdk.VirtualThreadSubmitFailed";

private final RecordingStream recordingStream;

private final Iterable<Tag> tags;

public VirtualThreadMetrics() {
this(new RecordingConfig(), emptyList());
}

public VirtualThreadMetrics(Iterable<Tag> tags) {
this(new RecordingConfig(), tags);
}

private VirtualThreadMetrics(RecordingConfig config, Iterable<Tag> tags) {
this.recordingStream = createRecordingStream(config);
this.tags = tags;
}

@Override
public void bindTo(MeterRegistry registry) {
Timer pinnedTimer = Timer.builder("jvm.threads.virtual.pinned")
.description("The duration while the virtual thread was pinned without releasing its platform thread")
.tags(tags)
.register(registry);

Counter submitFailedCounter = Counter.builder("jvm.threads.virtual.submit.failed")
.description("The number of events when starting or unparking a virtual thread failed")
.tags(tags)
.register(registry);

recordingStream.onEvent(PINNED_EVENT, event -> pinnedTimer.record(event.getDuration()));
recordingStream.onEvent(SUBMIT_FAILED_EVENT, event -> submitFailedCounter.increment());
}

private RecordingStream createRecordingStream(RecordingConfig config) {
RecordingStream recordingStream = new RecordingStream();
recordingStream.enable(PINNED_EVENT).withThreshold(config.pinnedThreshold);
recordingStream.enable(SUBMIT_FAILED_EVENT);
recordingStream.setMaxAge(config.maxAge);
recordingStream.setMaxSize(config.maxSizeBytes);
recordingStream.startAsync();

return recordingStream;
}

@Override
public void close() {
recordingStream.close();
}

private record RecordingConfig(Duration maxAge, long maxSizeBytes, Duration pinnedThreshold) {
private RecordingConfig() {
this(Duration.ofSeconds(5), 10L * 1024 * 1024, Duration.ofMillis(20));
}

private RecordingConfig {
Objects.requireNonNull(maxAge, "maxAge parameter must not be null");
Objects.requireNonNull(pinnedThreshold, "pinnedThreshold must not be null");
if (maxSizeBytes < 0) {
throw new IllegalArgumentException("maxSizeBytes must be positive");
}
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* Copyright 2024 VMware, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

/**
* Instrumentation of JDK classes.
*/
@NonNullApi
@NonNullFields
package io.micrometer.java21.instrument.binder.jdk;

import io.micrometer.common.lang.NonNullApi;
import io.micrometer.common.lang.NonNullFields;
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
/*
* Copyright 2024 VMware, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.micrometer.java21.instrument.binder.jdk;

import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.Tags;
import io.micrometer.core.instrument.simple.SimpleMeterRegistry;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;

import java.lang.reflect.Constructor;
import java.time.Duration;
import java.util.concurrent.*;
import java.util.concurrent.locks.LockSupport;

import static java.lang.Thread.State.WAITING;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.awaitility.Awaitility.await;

/**
* Tests for {@link VirtualThreadMetrics}. If you run these tests from your IDE,
* {@link #submitFailedEventsShouldBeRecorded()} might fail depending on your setup. This
* is because the test (through {@link #virtualThreadFactoryFor(Executor)}) utilizes
* reflection against the {@code java.lang} package which needs to be explicitly enabled.
* If you run into such an issue you can either change your setup and let your IDE run the
* tests utilizing the build system (Gradle) or add the following JVM arg to your test
* config: {@code --add-opens java.base/java.lang=ALL-UNNAMED}
*
* @author Artyom Gabeev
* @author Jonatan Ivanov
*/
@Tag("reflective")
class VirtualThreadMetricsReflectiveTests {

private static final Tags TAGS = Tags.of("k", "v");

private SimpleMeterRegistry registry;

private VirtualThreadMetrics virtualThreadMetrics;

@BeforeEach
void setUp() {
registry = new SimpleMeterRegistry();
virtualThreadMetrics = new VirtualThreadMetrics(TAGS);
virtualThreadMetrics.bindTo(registry);
}

@AfterEach
void tearDown() {
virtualThreadMetrics.close();
}

/**
* Uses a similar approach as the JDK tests to make starting or unparking a virtual
* thread fail, see {@link #virtualThreadFactoryFor(Executor)} and <a href=
* "https://github.com/openjdk/jdk/blob/fdfe503d016086cf78b5a8c27dbe45f0261c68ab/test/jdk/java/lang/Thread/virtual/JfrEvents.java#L143-L187">JfrEvents.java</a>
*/
@Test
void submitFailedEventsShouldBeRecorded() {
try (ExecutorService cachedPool = Executors.newCachedThreadPool()) {
ThreadFactory factory = virtualThreadFactoryFor(cachedPool);
Thread thread = factory.newThread(LockSupport::park);
thread.start();

await().atMost(Duration.ofSeconds(2)).until(() -> thread.getState() == WAITING);
cachedPool.shutdown();

// unpark, the pool was shut down, this should fail
assertThatThrownBy(() -> LockSupport.unpark(thread)).isInstanceOf(RejectedExecutionException.class);

Counter counter = registry.get("jvm.threads.virtual.submit.failed").tags(TAGS).counter();
await().atMost(Duration.ofSeconds(2)).until(() -> counter.count() == 1);

// park, the pool was shut down, this should fail
assertThatThrownBy(() -> factory.newThread(LockSupport::park).start())
.isInstanceOf(RejectedExecutionException.class);
await().atMost(Duration.ofSeconds(2)).until(() -> counter.count() == 2);
}
}

/**
* Creates a {@link ThreadFactory} for virtual threads. The created virtual threads
* will be bound to the provided platform thread pool instead of a default
* ForkJoinPool. At its current form, this is a hack, it utilizes reflection to supply
* the platform thread pool. It seems though there is no other way of doing this, the
* JDK tests are also utilizing reflection to do the same, see: <a href=
* "https://github.com/openjdk/jdk/blob/fdfe503d016086cf78b5a8c27dbe45f0261c68ab/test/lib/jdk/test/lib/thread/VThreadScheduler.java#L71-L90">VThreadScheduler.java</a>
* @param pool platform pool
* @return virtual thread factory bound to the provided platform pool
*/
private static ThreadFactory virtualThreadFactoryFor(Executor pool) {
try {
Class<?> clazz = Class.forName("java.lang.ThreadBuilders$VirtualThreadBuilder");
Constructor<?> constructor = clazz.getDeclaredConstructor(Executor.class);
constructor.setAccessible(true);
return ((Thread.Builder.OfVirtual) constructor.newInstance(pool)).factory();
}
catch (Exception e) {
throw new RuntimeException(e);
}
}

}
Loading

0 comments on commit 4b6b761

Please sign in to comment.