From a28633af13cd15d0c79896e6b7ec6fd454eeeb23 Mon Sep 17 00:00:00 2001 From: anormanariasystems Date: Fri, 11 Aug 2023 10:10:20 -0700 Subject: [PATCH] creating a pekko metrics libary in the manner of the akka 26 one https://github.com/erikvanoosten/metrics-scala/issues/482 --- .gitignore | 3 +- build.sbt | 17 +- .../grons/metrics4/scala/ActorMetrics.scala | 177 +++++++++++++++++ .../ActorInstrumentedLifeCycleSpec.scala | 112 +++++++++++ .../metrics4/scala/ActorMetricsSpec.scala | 181 ++++++++++++++++++ 5 files changed, 488 insertions(+), 2 deletions(-) create mode 100644 metrics-pekko/src/main/scala/nl/grons/metrics4/scala/ActorMetrics.scala create mode 100644 metrics-pekko/src/test/scala/nl/grons/metrics4/scala/ActorInstrumentedLifeCycleSpec.scala create mode 100644 metrics-pekko/src/test/scala/nl/grons/metrics4/scala/ActorMetricsSpec.scala diff --git a/.gitignore b/.gitignore index e74d1486..3e78837e 100644 --- a/.gitignore +++ b/.gitignore @@ -5,4 +5,5 @@ target/ .project .settings/ .cache -.bsp/ \ No newline at end of file +.bsp/ +/bin/ diff --git a/build.sbt b/build.sbt index 2fd819b5..4874f1d6 100644 --- a/build.sbt +++ b/build.sbt @@ -38,7 +38,7 @@ lazy val commonSettings = Seq( ThisBuild / publishTo := sonatypePublishTo.value lazy val root = (project in file(".")) - .aggregate(metricsScala, metricsScalaHdr, metricsAkka24, metricsAkka25, metricsAkka26) + .aggregate(metricsScala, metricsScalaHdr, metricsAkka24, metricsAkka25, metricsAkka26, metricsPekko) .settings( crossScalaVersions := Nil, publishArtifact := false, @@ -77,6 +77,21 @@ lazy val metricsScalaHdr = (project in file("metrics-scala-hdr")) mimaPreviousArtifacts := mimaPrevious(scalaVersion.value) ) +lazy val metricsPekko = (project in file("metrics-pekko")) + .dependsOn(metricsScala) + .settings( + commonSettings, + crossScalaVersions := Seq("3.1.3", "2.13.11", "2.12.18"), + name := "metrics4-pekko", + description := "metrics-scala for pekko 1.0.1 and Scala " + CrossVersion.binaryScalaVersion(scalaVersion.value), + libraryDependencies ++= Seq( + "org.apache.pekko" %% "pekko-actor" % "1.0.1", + "org.apache.pekko" %% "pekko-testkit" % "1.0.1" % Test + ), + sourceDirectory := baseDirectory.value.getParentFile / "metrics-pekko" / "src", + mimaPreviousArtifacts := mimaPrevious(scalaVersion.value) + ) + lazy val metricsAkka26 = (project in file("metrics-akka-26")) .dependsOn(metricsScala) .settings( diff --git a/metrics-pekko/src/main/scala/nl/grons/metrics4/scala/ActorMetrics.scala b/metrics-pekko/src/main/scala/nl/grons/metrics4/scala/ActorMetrics.scala new file mode 100644 index 00000000..fb623440 --- /dev/null +++ b/metrics-pekko/src/main/scala/nl/grons/metrics4/scala/ActorMetrics.scala @@ -0,0 +1,177 @@ +/* + * Copyright (c) 2013-2022 Erik van Oosten + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package nl.grons.metrics4.scala + +import org.apache.pekko.actor.Actor + +/** + * Stackable [[Actor]] trait which links [[Gauge]] life cycles with the actor life cycle. + * + * When an actor is restarted, gauges can not be created again under the same name in the same metric registry. + * By mixing in this trait, all gauges created in this actor will be automatically unregistered before this actor + * restarts. + * + * Use it as follows: + * {{{ + * object Application { + * // The application wide metrics registry. + * val metricRegistry = new com.codahale.metrics.MetricRegistry() + * } + * trait Instrumented extends InstrumentedBuilder { + * val metricRegistry = Application.metricRegistry + * } + * + * class ExampleActor extends Actor with Instrumented with ActorInstrumentedLifecycle { + * + * var counter = 0 + * + * // The following gauge will automatically unregister before a restart of this actor. + * metrics.gauge("sample-gauge"){ + * counter + * } + * + * override def receive = { + * case 'increment => + * counter += 1 + * doWork() + * } + * + * def doWork(): Unit = { + * // etc etc etc + * } + * } + * + * }}} + */ +trait ActorInstrumentedLifeCycle extends Actor { self: InstrumentedBuilder => + + abstract override def preRestart(reason: Throwable, message: Option[Any]) = { + metrics.unregisterGauges() + super.preRestart(reason, message) + } + +} + +/** + * Stackable actor trait which counts received messages. + * + * Metric name defaults to the class of the actor (e.g. `ExampleActor` below) + .`receiveCounter` + * + * Use it as follows: + * {{{ + * object Application { + * // The application wide metrics registry. + * val metricRegistry = new com.codahale.metrics.MetricRegistry() + * } + * trait Instrumented extends InstrumentedBuilder { + * val metricRegistry = Application.metricRegistry + * } + * + * class ExampleActor extends Actor { + * + * def receive = { + * case _ => doWork() + * } + * } + * + * class InstrumentedExampleActor extends ExampleActor with ReceiveCounterActor with Instrumented + * + * }}} + */ +trait ReceiveCounterActor extends Actor { self: InstrumentedBuilder => + + def receiveCounterName: String = "receiveCounter" + lazy val counter: Counter = metrics.counter(receiveCounterName) + + private[this] lazy val wrapped = counter.count(super.receive) + + abstract override def receive = wrapped + +} + +/** + * Stackable actor trait which times the message receipt. + * + * Metric name defaults to the class of the actor (e.g. `ExampleActor` below) + `.receiveTimer` + * + * Use it as follows: + * {{{ + * object Application { + * // The application wide metrics registry. + * val metricRegistry = new com.codahale.metrics.MetricRegistry() + * } + * trait Instrumented extends InstrumentedBuilder { + * val metricRegistry = Application.metricRegistry + * } + * + * class ExampleActor extends Actor { + * + * def receive = { + * case _ => doWork() + * } + * } + * + * class InstrumentedExampleActor extends ExampleActor with ReceiveCounterActor with Instrumented + * + * }}} + */ +trait ReceiveTimerActor extends Actor { self: InstrumentedBuilder => + + def receiveTimerName: String = "receiveTimer" + lazy val timer: Timer = metrics.timer(receiveTimerName) + + private[this] lazy val wrapped = timer.timePF(super.receive) + + abstract override def receive = wrapped +} + +/** + * Stackable actor trait which meters thrown exceptions. + * + * Metric name defaults to the class of the actor (e.g. `ExampleActor` below) + `.receiveExceptionMeter` + * + * Use it as follows: + * {{{ + * object Application { + * // The application wide metrics registry. + * val metricRegistry = new com.codahale.metrics.MetricRegistry() + * } + * trait Instrumented extends InstrumentedBuilder { + * val metricRegistry = Application.metricRegistry + * } + * + * class ExampleActor extends Actor { + * + * def receive = { + * case _ => doWork() + * } + * } + * + * class InstrumentedExampleActor extends ExampleActor with ReceiveCounterActor with Instrumented + * + * }}} + */ +trait ReceiveExceptionMeterActor extends Actor { self: InstrumentedBuilder => + + def receiveExceptionMeterName: String = "receiveExceptionMeter" + lazy val meter: Meter = metrics.meter(receiveExceptionMeterName) + + private[this] lazy val wrapped = meter.exceptionMarkerPF(super.receive) + + abstract override def receive = wrapped + +} diff --git a/metrics-pekko/src/test/scala/nl/grons/metrics4/scala/ActorInstrumentedLifeCycleSpec.scala b/metrics-pekko/src/test/scala/nl/grons/metrics4/scala/ActorInstrumentedLifeCycleSpec.scala new file mode 100644 index 00000000..6ad46d2b --- /dev/null +++ b/metrics-pekko/src/test/scala/nl/grons/metrics4/scala/ActorInstrumentedLifeCycleSpec.scala @@ -0,0 +1,112 @@ +/* + * Copyright (c) 2013-2022 Erik van Oosten + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package nl.grons.metrics4.scala + +import org.apache.pekko.actor.{Actor, ActorSystem, Props} +import org.apache.pekko.testkit._ +import com.codahale.metrics._ +import com.typesafe.config.ConfigFactory +import nl.grons.metrics4.scala.ActorInstrumentedLifeCycleSpec._ +import org.scalatest.BeforeAndAfterAll +import org.scalatest.concurrent.ScalaFutures +import org.scalatest.funspec.AsyncFunSpecLike +import org.scalatest.matchers.should.Matchers + +import scala.collection.JavaConverters._ +import scala.concurrent.Promise + +object ActorInstrumentedLifeCycleSpec { + + // Don't log all those intentional exceptions + val NonLoggingActorSystem = ActorSystem("lifecycle-spec", ConfigFactory.parseMap(Map("pekko.loglevel" -> "OFF").asJava)) + + val TestMetricRegistry = new com.codahale.metrics.MetricRegistry() + + trait Instrumented extends InstrumentedBuilder { + val metricRegistry: MetricRegistry = TestMetricRegistry + } + + class ExampleActor(restarted: Promise[Boolean]) extends Actor with Instrumented with ActorInstrumentedLifeCycle { + + var counter = 0 + + // The following gauge will automatically unregister before a restart of this actor. + metrics.gauge("sample-gauge") { + counter + } + + override def preRestart(reason: Throwable, message: Option[Any]): Unit = { + self ! Symbol("prerestart") + super.preRestart(reason, message) + } + + def receive: Receive = { + case Symbol("increment") => + counter += 1 + case Symbol("get") => + sender() ! counter + case Symbol("error") => + throw new RuntimeException("BOOM!") + case Symbol("prerestart") => + restarted.success(true) + } + + } +} + +class ActorInstrumentedLifeCycleSpec extends TestKit(NonLoggingActorSystem) with AsyncFunSpecLike with ImplicitSender with Matchers with ScalaFutures with BeforeAndAfterAll { + + def report: Option[Any] = { + case class NameFilter(prefix: String) extends MetricFilter { + override def matches(name: String, metric: Metric): Boolean = name.startsWith(prefix) + } + + TestMetricRegistry + .getGauges(NameFilter("nl.grons.metrics4.scala")).asScala + .headOption + .map { case (_, g) => g.getValue } + } + + describe("an actor with gauges needing lifecycle management") { + val restartedPromise = Promise[Boolean]() + val ar = system.actorOf(Props(new ExampleActor(restartedPromise))) + + it("correctly builds a gauge that reports the correct value") { + ar ! Symbol("increment") + ar ! Symbol("increment") + ar ! Symbol("increment") + ar ! Symbol("get") + expectMsg(3) + report shouldBe Some(3) + } + + it("rebuilds the gauge on actor restart") { + ar ! Symbol("increment") + ar ! Symbol("increment") + ar ! Symbol("error") + ar ! Symbol("increment") + ar ! Symbol("get") + expectMsg(1) + whenReady(restartedPromise.future)(_ shouldBe true) + report shouldBe Some(1) + } + } + + override def afterAll(): Unit = { + TestKit.shutdownActorSystem(system) + } +} diff --git a/metrics-pekko/src/test/scala/nl/grons/metrics4/scala/ActorMetricsSpec.scala b/metrics-pekko/src/test/scala/nl/grons/metrics4/scala/ActorMetricsSpec.scala new file mode 100644 index 00000000..1cbe4276 --- /dev/null +++ b/metrics-pekko/src/test/scala/nl/grons/metrics4/scala/ActorMetricsSpec.scala @@ -0,0 +1,181 @@ +/* + * Copyright (c) 2013-2022 Erik van Oosten + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package nl.grons.metrics4.scala + +import org.apache.pekko.actor.{Actor, ActorSystem} +import com.codahale.metrics.{Metric, MetricFilter, MetricRegistry} +import org.scalatest.funspec.AnyFunSpec +import org.scalatest.matchers.should.Matchers._ +import org.scalatest.OneInstancePerTest + +import scala.collection.JavaConverters._ + +class ActorMetricsSpec extends AnyFunSpec with OneInstancePerTest { + import ActorMetricsSpec._ + import org.apache.pekko.testkit.TestActorRef + + implicit private val system: ActorSystem = ActorSystem() + + describe("A counter actor") { + it("invokes original receive and increments counter on new messages") { + val ref = TestActorRef(new CounterTestActor) + val counterName = "nl.grons.metrics4.scala.ActorMetricsSpec.CounterTestActor.receiveCounter" + + counterValue(counterName) should be (0) + + ref.receive("test") + + ref.underlyingActor.messages should contain only "test" + counterValue(counterName) should be (1) + } + + it("can override metric name") { + val ref = TestActorRef(new CounterTestActorNameOverride) + val counterName = "nl.grons.metrics4.scala.ActorMetricsSpec.CounterTestActorNameOverride.overrideReceiveCounter" + + counterValue(counterName) should be (0) + + ref.receive("test") + + ref.underlyingActor.messages should contain only "test" + counterValue(counterName) should be (1) + } + } + + describe("A timer actor") { + it("invokes original receive and times message processing") { + val ref = TestActorRef(new TimerTestActor) + val timerName = "nl.grons.metrics4.scala.ActorMetricsSpec.TimerTestActor.receiveTimer" + + timerCountValue(timerName) should be (0) + + ref.receive("test") + + ref.underlyingActor.messages should contain only "test" + timerCountValue(timerName) should be (1) + } + + it("can override metric name") { + val ref = TestActorRef(new TimerTestActor { + override def receiveTimerName: String = "something-else" + }) + val timerName = "nl.grons.metrics4.scala.ActorMetricsSpec.anon.something-else" + + timerCountValue(timerName) should be (0) + + ref.receive("test") + + ref.underlyingActor.messages should contain only "test" + timerCountValue(timerName) should be (1) + } + } + + describe("A exception meter actor") { + it("invokes original receive and meters thrown exceptions") { + val ref = TestActorRef(new ExceptionMeterTestActor) + val meterName = "nl.grons.metrics4.scala.ActorMetricsSpec.ExceptionMeterTestActor.receiveExceptionMeter" + + meterCountValue(meterName) should be (0) + + intercept[RuntimeException] { ref.receive("test") } + + ref.underlyingActor.messages should contain only "test" + meterCountValue(meterName) should be (1) + } + } + + describe("A composed actor") { + it("counts and times processing of messages") { + val ref = TestActorRef(new ComposedActor) + val counterName = "nl.grons.metrics4.scala.ActorMetricsSpec.ComposedActor.receiveCounter" + val timerName = "nl.grons.metrics4.scala.ActorMetricsSpec.ComposedActor.receiveTimer" + val meterName = "nl.grons.metrics4.scala.ActorMetricsSpec.ComposedActor.receiveExceptionMeter" + + counterValue(counterName) should be (0) + timerCountValue(timerName) should be (0) + meterCountValue(meterName) should be (0) + + intercept[RuntimeException] { ref.receive("test") } + + ref.underlyingActor.messages should contain only "test" + counterValue(counterName) should be (1) + timerCountValue(timerName) should be (1) + meterCountValue(meterName) should be (1) + } + } + + private def counterValue(counterName: String): Long = { + testMetricRegistry.getCounters(nameFilter(counterName)).values().asScala.headOption.map(_.getCount).getOrElse { + fail(s"Counter '${counterName}' was not registered. Registered counters: " + testMetricRegistry.getCounters.keySet().asScala) + } + } + + private def timerCountValue(timerName: String): Long = { + testMetricRegistry.getTimers(nameFilter(timerName)).values().asScala.headOption.map(_.getCount).getOrElse { + fail(s"Timer '${timerName}' was not registered. Registered timers: " + testMetricRegistry.getTimers.keySet().asScala) + } + } + + private def meterCountValue(meterName: String): Long = { + testMetricRegistry.getMeters(nameFilter(meterName)).values().asScala.headOption.map(_.getCount).getOrElse { + fail(s"Meter '${meterName}' was not registered. Registered meters: " + testMetricRegistry.getMeters.keySet().asScala) + } + } + + private def nameFilter(filterName: String) = new MetricFilter { + override def matches(name: String, metric: Metric): Boolean = name == filterName + } +} + +object ActorMetricsSpec { + + val testMetricRegistry = new MetricRegistry() + + trait ActorMetricsSpecInstrumented extends InstrumentedBuilder { + val metricRegistry: MetricRegistry = testMetricRegistry + } + + class TestActor extends Actor with ActorMetricsSpecInstrumented { + val messages = new scala.collection.mutable.ListBuffer[String]() + + def receive: Actor.Receive = { case message: String => messages += message } + } + + class ExceptionThrowingTestActor extends Actor with ActorMetricsSpecInstrumented { + val messages = new scala.collection.mutable.ListBuffer[String]() + + private def storeMessage: Actor.Receive = { case message: String => messages += message } + + def receive: Actor.Receive = storeMessage.andThen({ + case _ => throw new RuntimeException() + }) + } + + class CounterTestActor extends TestActor with ReceiveCounterActor + + class CounterTestActorNameOverride extends TestActor with ReceiveCounterActor { + override def receiveCounterName = "overrideReceiveCounter" + } + + class TimerTestActor extends TestActor with ReceiveTimerActor + + class ExceptionMeterTestActor extends ExceptionThrowingTestActor with ReceiveExceptionMeterActor + + class ComposedActor extends ExceptionThrowingTestActor + with ReceiveCounterActor with ReceiveTimerActor with ReceiveExceptionMeterActor + +}