From 59126b5cfc058efc3be3c04c534616a1b69723cb Mon Sep 17 00:00:00 2001 From: Maksym Ochenashko Date: Mon, 3 Apr 2023 15:01:34 +0300 Subject: [PATCH 1/3] instrument ce --- build.sbt | 4 +- .../org/typelevel/otel4s/metrics/Meter.scala | 7 +++ .../metrics/ObservableInstrumentBuilder.scala | 36 ++++++++++- .../metrics/ObservableUpDownCounter.scala | 2 +- .../metrics/SyncInstrumentBuilder.scala | 8 +++ .../otel4s/metrics/preset/CEMetrics.scala | 60 +++++++++++++++++++ .../java/metrics/HistogramBuilderImpl.scala | 29 +++++++++ 7 files changed, 140 insertions(+), 6 deletions(-) create mode 100644 core/metrics/src/main/scala/org/typelevel/otel4s/metrics/preset/CEMetrics.scala diff --git a/build.sbt b/build.sbt index a97ce1db7..c818fc790 100644 --- a/build.sbt +++ b/build.sbt @@ -24,7 +24,7 @@ ThisBuild / crossScalaVersions := Seq(Scala213, "3.2.2") ThisBuild / scalaVersion := Scala213 // the default Scala val CatsVersion = "2.9.0" -val CatsEffectVersion = "3.4.8" +val CatsEffectVersion = "3.5-4848d2e-20230329T044947Z-SNAPSHOT" val CatsMtlVersion = "1.3.0" val FS2Version = "3.6.1" val MUnitVersion = "1.0.0-M7" @@ -86,7 +86,7 @@ lazy val `core-metrics` = crossProject(JVMPlatform, JSPlatform, NativePlatform) .settings( name := "otel4s-core-metrics", libraryDependencies ++= Seq( - "org.typelevel" %%% "cats-effect-kernel" % CatsEffectVersion, + "org.typelevel" %%% "cats-effect" % CatsEffectVersion, "org.typelevel" %%% "cats-effect-testkit" % CatsEffectVersion % Test ) ) diff --git a/core/metrics/src/main/scala/org/typelevel/otel4s/metrics/Meter.scala b/core/metrics/src/main/scala/org/typelevel/otel4s/metrics/Meter.scala index 152cc8de6..7919522cf 100644 --- a/core/metrics/src/main/scala/org/typelevel/otel4s/metrics/Meter.scala +++ b/core/metrics/src/main/scala/org/typelevel/otel4s/metrics/Meter.scala @@ -63,6 +63,13 @@ trait Meter[F[_]] { name: String ): SyncInstrumentBuilder[F, UpDownCounter[F, Long]] + def observableGauge( + name: String + ): ObservableInstrumentBuilder[F, Double, ObservableGauge] + + def observableUpDownCounter( + name: String + ): ObservableInstrumentBuilder[F, Long, ObservableUpDownCounter] } object Meter { diff --git a/core/metrics/src/main/scala/org/typelevel/otel4s/metrics/ObservableInstrumentBuilder.scala b/core/metrics/src/main/scala/org/typelevel/otel4s/metrics/ObservableInstrumentBuilder.scala index dce174ce2..3952ae355 100644 --- a/core/metrics/src/main/scala/org/typelevel/otel4s/metrics/ObservableInstrumentBuilder.scala +++ b/core/metrics/src/main/scala/org/typelevel/otel4s/metrics/ObservableInstrumentBuilder.scala @@ -16,8 +16,8 @@ package org.typelevel.otel4s.metrics -trait ObservableInstrumentBuilder[F[_], A] { - type Self <: ObservableInstrumentBuilder[F, A] +trait ObservableInstrumentBuilder[F[_], Input, Instrument[_[_], _]] { + type Self <: ObservableInstrumentBuilder[F, Input, Instrument] /** Sets the unit of measure for this instrument. * @@ -45,5 +45,35 @@ trait ObservableInstrumentBuilder[F[_], A] { /** Creates an instrument with the given `unit` and `description` (if any). */ - def create: F[A] + def create(callback: F[Input]): F[Instrument[F, Input]] + + /** Creates an instrument with the given callback, using `unit` and + * `description` (if any). + * + * The callback will be called when the instrument is being observed. + * + * Callbacks are expected to abide by the following restrictions: + * - Short-living and (ideally) non-blocking + * - Run in a finite amount of time + * - Safe to call repeatedly, across multiple threads + * + * @param cb + * The callback which observes measurements when invoked + */ + def createWithCallback( + cb: ObservableMeasurement[F, A] => F[Unit] + ): Resource[F, Instrument] +} + +trait ObservableMeasurement[F[_], A] { + + /** Records a value with a set of attributes. + * + * @param value + * the value to record + * + * @param attributes + * the set of attributes to associate with the value + */ + def record(value: A, attributes: Attribute[_]*): F[Unit] } diff --git a/core/metrics/src/main/scala/org/typelevel/otel4s/metrics/ObservableUpDownCounter.scala b/core/metrics/src/main/scala/org/typelevel/otel4s/metrics/ObservableUpDownCounter.scala index 6d9754fc2..cad9abbcc 100644 --- a/core/metrics/src/main/scala/org/typelevel/otel4s/metrics/ObservableUpDownCounter.scala +++ b/core/metrics/src/main/scala/org/typelevel/otel4s/metrics/ObservableUpDownCounter.scala @@ -16,4 +16,4 @@ package org.typelevel.otel4s.metrics -trait ObservableUpDownCounter[F, A] +trait ObservableUpDownCounter[F[_], A] diff --git a/core/metrics/src/main/scala/org/typelevel/otel4s/metrics/SyncInstrumentBuilder.scala b/core/metrics/src/main/scala/org/typelevel/otel4s/metrics/SyncInstrumentBuilder.scala index a8b27a90d..a01095592 100644 --- a/core/metrics/src/main/scala/org/typelevel/otel4s/metrics/SyncInstrumentBuilder.scala +++ b/core/metrics/src/main/scala/org/typelevel/otel4s/metrics/SyncInstrumentBuilder.scala @@ -47,3 +47,11 @@ trait SyncInstrumentBuilder[F[_], A] { */ def create: F[A] } + +trait AsyncInstrumentBuilder[F[_], Input, Instrument[_[_], _]] { + type Self <: AsyncInstrumentBuilder[F, Input, Instrument] + + def withUnit(unit: String): Self + def withDescription(description: String): Self + def create(callback: F[Input]): F[Instrument[F, Input]] +} \ No newline at end of file diff --git a/core/metrics/src/main/scala/org/typelevel/otel4s/metrics/preset/CEMetrics.scala b/core/metrics/src/main/scala/org/typelevel/otel4s/metrics/preset/CEMetrics.scala new file mode 100644 index 000000000..06ce133be --- /dev/null +++ b/core/metrics/src/main/scala/org/typelevel/otel4s/metrics/preset/CEMetrics.scala @@ -0,0 +1,60 @@ +package org.typelevel.otel4s.metrics +package preset + +import cats.effect.Sync +import cats.effect.unsafe._ + +object CEMetrics { + + def make[F[_]: Sync: Meter](e: IORuntimeMetrics) = { + val meter = implicitly[Meter[F]] + + e.cpuStarvation.map { cpuStarvation => } + + e.compute.map { compute => + val wtc = meter + .observableUpDownCounter("worker-thread-count") + .create(Sync[F].delay(compute.workerThreadCount())) + + val wtc = meter + .observableUpDownCounter("active-thread-count") + .create(Sync[F].delay(compute.activeThreadCount())) + + val stc = meter + .observableUpDownCounter("searching-thread-count") + .create(Sync[F].delay(compute.searchingThreadCount())) + + val stc = meter + .observableUpDownCounter("searching-thread-count") + .create(Sync[F].delay(compute.searchingThreadCount())) + + val stc = meter + .observableUpDownCounter("searching-thread-count") + .create(Sync[F].delay(compute.searchingThreadCount())) + + val stc = meter + .observableUpDownCounter("searching-thread-count") + .create(Sync[F].delay(compute.searchingThreadCount())) + } + + } + + /* + def starvationCount(): Long + def maxClockDriftMs(): Long + def currentClockDriftMs(): Long + */ + + /* + + compute { + def workerThreadCount(): Int + def activeThreadCount(): Int + def searchingThreadCount(): Int + def blockedWorkerThreadCount(): Int + def localQueueFiberCount(): Long + def suspendedFiberCount(): Long + } + + */ +} diff --git a/java/metrics/src/main/scala/org/typelevel/otel4s/java/metrics/HistogramBuilderImpl.scala b/java/metrics/src/main/scala/org/typelevel/otel4s/java/metrics/HistogramBuilderImpl.scala index 7392d3b7a..0233ffb9a 100644 --- a/java/metrics/src/main/scala/org/typelevel/otel4s/java/metrics/HistogramBuilderImpl.scala +++ b/java/metrics/src/main/scala/org/typelevel/otel4s/java/metrics/HistogramBuilderImpl.scala @@ -19,6 +19,7 @@ package java package metrics import cats.effect.kernel.Sync +import cats.effect.std.Dispatcher import io.opentelemetry.api.metrics.{Meter => JMeter} import org.typelevel.otel4s.metrics._ @@ -40,6 +41,34 @@ private[java] case class HistogramBuilderImpl[F[_]]( val b = jMeter.histogramBuilder(name) unit.foreach(b.setUnit) description.foreach(b.setDescription) + jMeter.counterBuilder().buildWithCallback() new HistogramImpl(b.build) + + jMeter.gaugeBuilder().buildWithCallback(m => m.record()) } + + Dispatcher.sequential() + + import _root_.java.lang.management.ManagementFactory + import javax.management.{MBeanServer, ObjectName} + + val server: MBeanServer = ManagementFactory.getPlatformMBeanServer + val mbeanName = new ObjectName("cats.effect.metrics:type=CpuStarvation") + + server.getAttribute(mbeanName, "CpuStarvationCount").asInstanceOf[Long] + + def collect(): java.util.List[Collector.MetricFamilySamples] = + java.util.Arrays.asList( + createGauge("cats_effect_cpu_starvation_count", "", "CpuStarvationCount"), + createGauge("cats_effect_max_clock_drift_ms", "", "MaxClockDriftMs"), + createGauge("cats_effect_current_clock_drift_ms", "", "CurrentClockDriftMs"), + ) + + private def createGauge(metric: String, help: String, attributeName: String): GaugeMetricFamily = { + val metricFamily = new GaugeMetricFamily(metric, help, Collections.emptyList[String]()) + val value = server.getAttribute(mbeanName, attributeName).asInstanceOf[Long] + metricFamily.addMetric(Collections.emptyList[String](), value.toDouble) + } + + cats.effect.unsafe.IORuntime.global.scheduler } From b96e1a07caf1b071bbb511b99f63280a360a0302 Mon Sep 17 00:00:00 2001 From: Maksym Ochenashko Date: Tue, 11 Apr 2023 16:22:41 +0300 Subject: [PATCH 2/3] Draft integration with IORuntimeMetrics --- build.sbt | 1 + .../metrics/SyncInstrumentBuilder.scala | 8 -- .../otel4s/metrics/preset/CEMetrics.scala | 60 ------------- .../otel4s/metrics/preset/IOMetrics.scala | 84 +++++++++++++++++++ .../main/scala/RuntimeMetricsExample.scala | 47 +++++++++++ .../java/metrics/HistogramBuilderImpl.scala | 29 ------- 6 files changed, 132 insertions(+), 97 deletions(-) delete mode 100644 core/metrics/src/main/scala/org/typelevel/otel4s/metrics/preset/CEMetrics.scala create mode 100644 core/metrics/src/main/scala/org/typelevel/otel4s/metrics/preset/IOMetrics.scala create mode 100644 examples/src/main/scala/RuntimeMetricsExample.scala diff --git a/build.sbt b/build.sbt index 9b14a3a04..c8484c5e4 100644 --- a/build.sbt +++ b/build.sbt @@ -232,6 +232,7 @@ lazy val examples = project name := "otel4s-examples", libraryDependencies ++= Seq( "io.opentelemetry" % "opentelemetry-exporter-otlp" % OpenTelemetryVersion, + "io.opentelemetry" % "opentelemetry-exporter-logging" % OpenTelemetryVersion, "io.opentelemetry" % "opentelemetry-sdk" % OpenTelemetryVersion, "io.opentelemetry" % "opentelemetry-sdk-extension-autoconfigure" % s"${OpenTelemetryVersion}-alpha", "io.opentelemetry" % "opentelemetry-extension-trace-propagators" % s"${OpenTelemetryVersion}" % Runtime diff --git a/core/metrics/src/main/scala/org/typelevel/otel4s/metrics/SyncInstrumentBuilder.scala b/core/metrics/src/main/scala/org/typelevel/otel4s/metrics/SyncInstrumentBuilder.scala index a01095592..a8b27a90d 100644 --- a/core/metrics/src/main/scala/org/typelevel/otel4s/metrics/SyncInstrumentBuilder.scala +++ b/core/metrics/src/main/scala/org/typelevel/otel4s/metrics/SyncInstrumentBuilder.scala @@ -47,11 +47,3 @@ trait SyncInstrumentBuilder[F[_], A] { */ def create: F[A] } - -trait AsyncInstrumentBuilder[F[_], Input, Instrument[_[_], _]] { - type Self <: AsyncInstrumentBuilder[F, Input, Instrument] - - def withUnit(unit: String): Self - def withDescription(description: String): Self - def create(callback: F[Input]): F[Instrument[F, Input]] -} \ No newline at end of file diff --git a/core/metrics/src/main/scala/org/typelevel/otel4s/metrics/preset/CEMetrics.scala b/core/metrics/src/main/scala/org/typelevel/otel4s/metrics/preset/CEMetrics.scala deleted file mode 100644 index 06ce133be..000000000 --- a/core/metrics/src/main/scala/org/typelevel/otel4s/metrics/preset/CEMetrics.scala +++ /dev/null @@ -1,60 +0,0 @@ -package org.typelevel.otel4s.metrics -package preset - -import cats.effect.Sync -import cats.effect.unsafe._ - -object CEMetrics { - - def make[F[_]: Sync: Meter](e: IORuntimeMetrics) = { - val meter = implicitly[Meter[F]] - - e.cpuStarvation.map { cpuStarvation => } - - e.compute.map { compute => - val wtc = meter - .observableUpDownCounter("worker-thread-count") - .create(Sync[F].delay(compute.workerThreadCount())) - - val wtc = meter - .observableUpDownCounter("active-thread-count") - .create(Sync[F].delay(compute.activeThreadCount())) - - val stc = meter - .observableUpDownCounter("searching-thread-count") - .create(Sync[F].delay(compute.searchingThreadCount())) - - val stc = meter - .observableUpDownCounter("searching-thread-count") - .create(Sync[F].delay(compute.searchingThreadCount())) - - val stc = meter - .observableUpDownCounter("searching-thread-count") - .create(Sync[F].delay(compute.searchingThreadCount())) - - val stc = meter - .observableUpDownCounter("searching-thread-count") - .create(Sync[F].delay(compute.searchingThreadCount())) - } - - } - - /* - def starvationCount(): Long - def maxClockDriftMs(): Long - def currentClockDriftMs(): Long - */ - - /* - - compute { - def workerThreadCount(): Int - def activeThreadCount(): Int - def searchingThreadCount(): Int - def blockedWorkerThreadCount(): Int - def localQueueFiberCount(): Long - def suspendedFiberCount(): Long - } - - */ -} diff --git a/core/metrics/src/main/scala/org/typelevel/otel4s/metrics/preset/IOMetrics.scala b/core/metrics/src/main/scala/org/typelevel/otel4s/metrics/preset/IOMetrics.scala new file mode 100644 index 000000000..37f5ce5fc --- /dev/null +++ b/core/metrics/src/main/scala/org/typelevel/otel4s/metrics/preset/IOMetrics.scala @@ -0,0 +1,84 @@ +package org.typelevel.otel4s.metrics +package preset + +import cats.effect.{Resource, Sync} +import cats.effect.unsafe._ +import cats.syntax.apply._ +import cats.syntax.functor._ + +object IOMetrics { + + def fromRuntimeMetrics[F[_]: Sync: Meter]( + metrics: IORuntimeMetrics + ): Resource[F, Unit] = { + val meter = implicitly[Meter[F]] + + val starvation = metrics.cpuStarvation.map { cpuStarvation => + val starvationCount = meter + .observableUpDownCounter("cpu-starvation-count") + .createWithCallback(cb => + Sync[F].defer(cb.record(cpuStarvation.starvationCount())) + ) + + val maxDrift = meter + .observableUpDownCounter("max-clock-drift-ms") + .createWithCallback(cb => + Sync[F].defer(cb.record(cpuStarvation.maxClockDriftMs())) + ) + + val currentDrift = meter + .observableUpDownCounter("current-clock-drift-ms") + .createWithCallback(cb => + Sync[F].defer(cb.record(cpuStarvation.currentClockDriftMs())) + ) + + (starvationCount, maxDrift, currentDrift).tupled.void + } + + val compute = metrics.compute.map { compute => + val wtc = meter + .observableUpDownCounter("worker-thread-count") + .createWithCallback(cb => + Sync[F].defer(cb.record(compute.workerThreadCount().toLong)) + ) + + val atc = meter + .observableUpDownCounter("active-thread-count") + .createWithCallback(cb => + Sync[F].defer(cb.record(compute.activeThreadCount().toLong)) + ) + + val stc = meter + .observableUpDownCounter("searching-thread-count") + .createWithCallback(cb => + Sync[F].defer(cb.record(compute.searchingThreadCount().toLong)) + ) + + val bwtc = meter + .observableUpDownCounter("blocker-worker-thread-count") + .createWithCallback(cb => + Sync[F].defer(cb.record(compute.blockedWorkerThreadCount().toLong)) + ) + + val lqfc = meter + .observableUpDownCounter("local-queue-fiber-count") + .createWithCallback(cb => + Sync[F].defer(cb.record(compute.localQueueFiberCount())) + ) + + val sfc = meter + .observableUpDownCounter("suspended-fiber-count") + .createWithCallback(cb => + Sync[F].defer(cb.record(compute.suspendedFiberCount())) + ) + + (wtc, atc, stc, bwtc, lqfc, sfc).tupled.void + } + + for { + _ <- starvation.getOrElse(Resource.unit[F]) + _ <- compute.getOrElse(Resource.unit[F]) + } yield () + } + +} diff --git a/examples/src/main/scala/RuntimeMetricsExample.scala b/examples/src/main/scala/RuntimeMetricsExample.scala new file mode 100644 index 000000000..b6a574eb1 --- /dev/null +++ b/examples/src/main/scala/RuntimeMetricsExample.scala @@ -0,0 +1,47 @@ +import cats.effect.{IO, IOApp, Resource} +import cats.syntax.foldable._ +import io.opentelemetry.sdk.OpenTelemetrySdk +import io.opentelemetry.sdk.metrics.SdkMeterProvider +import io.opentelemetry.sdk.testing.exporter.InMemoryMetricReader +import org.typelevel.otel4s.java.OtelJava +import org.typelevel.otel4s.metrics.preset.IOMetrics + +import scala.jdk.CollectionConverters._ +import scala.util.Random + +object RuntimeMetricsExample extends IOApp.Simple { + def run: IO[Unit] = { + val jMetricReader = InMemoryMetricReader.create() + val jSdk = { + val meterProvider = SdkMeterProvider + .builder() + .registerMetricReader(jMetricReader) + .build() + + val sdk = OpenTelemetrySdk + .builder() + .setMeterProvider(meterProvider) + .build() + + sdk + } + + def printMetrics: IO[Unit] = + IO.delay(jMetricReader.collectAllMetrics().asScala.toList) + .flatMap(_.traverse_(v => IO.println(v.getName + " = " + v.getData))) + + Resource + .eval(OtelJava.forAsync[IO](jSdk)) + .evalMap(_.meterProvider.get("cats-effect-runtime-metrics")) + .use { implicit meter => + IOMetrics.fromRuntimeMetrics[IO](runtime.metrics).surround { + compute >> printMetrics + } + } + } + + private def compute: IO[Unit] = + IO.parReplicateAN(5)(20, IO.blocking(Thread.sleep(Random.nextInt(1000)))) + .void + +} diff --git a/java/metrics/src/main/scala/org/typelevel/otel4s/java/metrics/HistogramBuilderImpl.scala b/java/metrics/src/main/scala/org/typelevel/otel4s/java/metrics/HistogramBuilderImpl.scala index 0233ffb9a..7392d3b7a 100644 --- a/java/metrics/src/main/scala/org/typelevel/otel4s/java/metrics/HistogramBuilderImpl.scala +++ b/java/metrics/src/main/scala/org/typelevel/otel4s/java/metrics/HistogramBuilderImpl.scala @@ -19,7 +19,6 @@ package java package metrics import cats.effect.kernel.Sync -import cats.effect.std.Dispatcher import io.opentelemetry.api.metrics.{Meter => JMeter} import org.typelevel.otel4s.metrics._ @@ -41,34 +40,6 @@ private[java] case class HistogramBuilderImpl[F[_]]( val b = jMeter.histogramBuilder(name) unit.foreach(b.setUnit) description.foreach(b.setDescription) - jMeter.counterBuilder().buildWithCallback() new HistogramImpl(b.build) - - jMeter.gaugeBuilder().buildWithCallback(m => m.record()) } - - Dispatcher.sequential() - - import _root_.java.lang.management.ManagementFactory - import javax.management.{MBeanServer, ObjectName} - - val server: MBeanServer = ManagementFactory.getPlatformMBeanServer - val mbeanName = new ObjectName("cats.effect.metrics:type=CpuStarvation") - - server.getAttribute(mbeanName, "CpuStarvationCount").asInstanceOf[Long] - - def collect(): java.util.List[Collector.MetricFamilySamples] = - java.util.Arrays.asList( - createGauge("cats_effect_cpu_starvation_count", "", "CpuStarvationCount"), - createGauge("cats_effect_max_clock_drift_ms", "", "MaxClockDriftMs"), - createGauge("cats_effect_current_clock_drift_ms", "", "CurrentClockDriftMs"), - ) - - private def createGauge(metric: String, help: String, attributeName: String): GaugeMetricFamily = { - val metricFamily = new GaugeMetricFamily(metric, help, Collections.emptyList[String]()) - val value = server.getAttribute(mbeanName, attributeName).asInstanceOf[Long] - metricFamily.addMetric(Collections.emptyList[String](), value.toDouble) - } - - cats.effect.unsafe.IORuntime.global.scheduler } From 5efc8b3160349184d7880a3baa0756a56dc0c191 Mon Sep 17 00:00:00 2001 From: Maksym Ochenashko Date: Wed, 12 Apr 2023 11:33:20 +0300 Subject: [PATCH 3/3] Tweak compute logic --- .../main/scala/RuntimeMetricsExample.scala | 24 ++++++++++++++----- 1 file changed, 18 insertions(+), 6 deletions(-) diff --git a/examples/src/main/scala/RuntimeMetricsExample.scala b/examples/src/main/scala/RuntimeMetricsExample.scala index b6a574eb1..4124006c0 100644 --- a/examples/src/main/scala/RuntimeMetricsExample.scala +++ b/examples/src/main/scala/RuntimeMetricsExample.scala @@ -1,3 +1,4 @@ +import cats.effect.std.Random import cats.effect.{IO, IOApp, Resource} import cats.syntax.foldable._ import io.opentelemetry.sdk.OpenTelemetrySdk @@ -6,8 +7,8 @@ import io.opentelemetry.sdk.testing.exporter.InMemoryMetricReader import org.typelevel.otel4s.java.OtelJava import org.typelevel.otel4s.metrics.preset.IOMetrics +import scala.concurrent.duration._ import scala.jdk.CollectionConverters._ -import scala.util.Random object RuntimeMetricsExample extends IOApp.Simple { def run: IO[Unit] = { @@ -27,21 +28,32 @@ object RuntimeMetricsExample extends IOApp.Simple { } def printMetrics: IO[Unit] = - IO.delay(jMetricReader.collectAllMetrics().asScala.toList) - .flatMap(_.traverse_(v => IO.println(v.getName + " = " + v.getData))) + for { + _ <- IO.println("New cycle: ") + metrics <- IO.delay(jMetricReader.collectAllMetrics().asScala.toList) + _ <- metrics.traverse_(v => IO.println(v.getName + " = " + v.getData)) + } yield () Resource .eval(OtelJava.forAsync[IO](jSdk)) .evalMap(_.meterProvider.get("cats-effect-runtime-metrics")) .use { implicit meter => IOMetrics.fromRuntimeMetrics[IO](runtime.metrics).surround { - compute >> printMetrics + printMetrics.delayBy(500.millis).foreverM.background.surround { + compute + } } } } private def compute: IO[Unit] = - IO.parReplicateAN(5)(20, IO.blocking(Thread.sleep(Random.nextInt(1000)))) - .void + Random.scalaUtilRandom[IO].flatMap { random => + val io = random.betweenLong(10, 3000).flatMap { delay => + if (delay % 2 == 0) IO.blocking(Thread.sleep(delay)) + else IO.delay(Thread.sleep(delay)) + } + + IO.parReplicateAN(30)(100, io).void + } }