Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

creating a pekko metrics libary in the manner of the akka 26 one #483

Merged
merged 2 commits into from
Aug 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,5 @@ target/
.project
.settings/
.cache
.bsp/
.bsp/
/bin/
17 changes: 16 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ lazy val commonSettings = Seq(
ThisBuild / publishTo := sonatypePublishTo.value

lazy val root = (project in file("."))
.aggregate(metricsScala, metricsScalaHdr, metricsAkka25, metricsAkka26)
.aggregate(metricsScala, metricsScalaHdr, metricsAkka25, metricsAkka26, metricsPekko)
.settings(
crossScalaVersions := Nil,
publishArtifact := false,
Expand Down Expand Up @@ -77,6 +77,21 @@ lazy val metricsScalaHdr = (project in file("metrics-scala-hdr"))
mimaPreviousArtifacts := mimaPrevious(scalaVersion.value)
)

lazy val metricsPekko = (project in file("metrics-pekko"))
.dependsOn(metricsScala)
.settings(
commonSettings,
crossScalaVersions := Seq("3.1.3", "2.13.11", "2.12.18"),
name := "metrics4-pekko",
description := "metrics-scala for pekko 1.0.1 and Scala " + CrossVersion.binaryScalaVersion(scalaVersion.value),
libraryDependencies ++= Seq(
"org.apache.pekko" %% "pekko-actor" % "1.0.1",
"org.apache.pekko" %% "pekko-testkit" % "1.0.1" % Test
),
sourceDirectory := baseDirectory.value.getParentFile / "metrics-pekko" / "src",
mimaPreviousArtifacts := mimaPrevious(scalaVersion.value)
)

lazy val metricsAkka26 = (project in file("metrics-akka-26"))
.dependsOn(metricsScala)
.settings(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,177 @@
/*
* Copyright (c) 2013-2022 Erik van Oosten
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package nl.grons.metrics4.scala

import org.apache.pekko.actor.Actor

/**
* Stackable [[Actor]] trait which links [[Gauge]] life cycles with the actor life cycle.
*
* When an actor is restarted, gauges can not be created again under the same name in the same metric registry.
* By mixing in this trait, all gauges created in this actor will be automatically unregistered before this actor
* restarts.
*
* Use it as follows:
* {{{
* object Application {
* // The application wide metrics registry.
* val metricRegistry = new com.codahale.metrics.MetricRegistry()
* }
* trait Instrumented extends InstrumentedBuilder {
* val metricRegistry = Application.metricRegistry
* }
*
* class ExampleActor extends Actor with Instrumented with ActorInstrumentedLifecycle {
*
* var counter = 0
*
* // The following gauge will automatically unregister before a restart of this actor.
* metrics.gauge("sample-gauge"){
* counter
* }
*
* override def receive = {
* case 'increment =>
* counter += 1
* doWork()
* }
*
* def doWork(): Unit = {
* // etc etc etc
* }
* }
*
* }}}
*/
trait ActorInstrumentedLifeCycle extends Actor { self: InstrumentedBuilder =>

abstract override def preRestart(reason: Throwable, message: Option[Any]) = {
metrics.unregisterGauges()
super.preRestart(reason, message)
}

}

/**
* Stackable actor trait which counts received messages.
*
* Metric name defaults to the class of the actor (e.g. `ExampleActor` below) + .`receiveCounter`
*
* Use it as follows:
* {{{
* object Application {
* // The application wide metrics registry.
* val metricRegistry = new com.codahale.metrics.MetricRegistry()
* }
* trait Instrumented extends InstrumentedBuilder {
* val metricRegistry = Application.metricRegistry
* }
*
* class ExampleActor extends Actor {
*
* def receive = {
* case _ => doWork()
* }
* }
*
* class InstrumentedExampleActor extends ExampleActor with ReceiveCounterActor with Instrumented
*
* }}}
*/
trait ReceiveCounterActor extends Actor { self: InstrumentedBuilder =>

def receiveCounterName: String = "receiveCounter"
lazy val counter: Counter = metrics.counter(receiveCounterName)

private[this] lazy val wrapped = counter.count(super.receive)

abstract override def receive = wrapped

}

/**
* Stackable actor trait which times the message receipt.
*
* Metric name defaults to the class of the actor (e.g. `ExampleActor` below) + `.receiveTimer`
*
* Use it as follows:
* {{{
* object Application {
* // The application wide metrics registry.
* val metricRegistry = new com.codahale.metrics.MetricRegistry()
* }
* trait Instrumented extends InstrumentedBuilder {
* val metricRegistry = Application.metricRegistry
* }
*
* class ExampleActor extends Actor {
*
* def receive = {
* case _ => doWork()
* }
* }
*
* class InstrumentedExampleActor extends ExampleActor with ReceiveCounterActor with Instrumented
*
* }}}
*/
trait ReceiveTimerActor extends Actor { self: InstrumentedBuilder =>

def receiveTimerName: String = "receiveTimer"
lazy val timer: Timer = metrics.timer(receiveTimerName)

private[this] lazy val wrapped = timer.timePF(super.receive)

abstract override def receive = wrapped
}

/**
* Stackable actor trait which meters thrown exceptions.
*
* Metric name defaults to the class of the actor (e.g. `ExampleActor` below) + `.receiveExceptionMeter`
*
* Use it as follows:
* {{{
* object Application {
* // The application wide metrics registry.
* val metricRegistry = new com.codahale.metrics.MetricRegistry()
* }
* trait Instrumented extends InstrumentedBuilder {
* val metricRegistry = Application.metricRegistry
* }
*
* class ExampleActor extends Actor {
*
* def receive = {
* case _ => doWork()
* }
* }
*
* class InstrumentedExampleActor extends ExampleActor with ReceiveCounterActor with Instrumented
*
* }}}
*/
trait ReceiveExceptionMeterActor extends Actor { self: InstrumentedBuilder =>

def receiveExceptionMeterName: String = "receiveExceptionMeter"
lazy val meter: Meter = metrics.meter(receiveExceptionMeterName)

private[this] lazy val wrapped = meter.exceptionMarkerPF(super.receive)

abstract override def receive = wrapped

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
/*
* Copyright (c) 2013-2022 Erik van Oosten
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package nl.grons.metrics4.scala

import org.apache.pekko.actor.{Actor, ActorSystem, Props}
import org.apache.pekko.testkit._
import com.codahale.metrics._
import com.typesafe.config.ConfigFactory
import nl.grons.metrics4.scala.ActorInstrumentedLifeCycleSpec._
import org.scalatest.BeforeAndAfterAll
import org.scalatest.concurrent.ScalaFutures
import org.scalatest.funspec.AsyncFunSpecLike
import org.scalatest.matchers.should.Matchers

import scala.collection.JavaConverters._
import scala.concurrent.Promise

object ActorInstrumentedLifeCycleSpec {

// Don't log all those intentional exceptions
val NonLoggingActorSystem = ActorSystem("lifecycle-spec", ConfigFactory.parseMap(Map("pekko.loglevel" -> "OFF").asJava))

val TestMetricRegistry = new com.codahale.metrics.MetricRegistry()

trait Instrumented extends InstrumentedBuilder {
val metricRegistry: MetricRegistry = TestMetricRegistry
}

class ExampleActor(restarted: Promise[Boolean]) extends Actor with Instrumented with ActorInstrumentedLifeCycle {

var counter = 0

// The following gauge will automatically unregister before a restart of this actor.
metrics.gauge("sample-gauge") {
counter
}

override def preRestart(reason: Throwable, message: Option[Any]): Unit = {
self ! Symbol("prerestart")
super.preRestart(reason, message)
}

def receive: Receive = {
case Symbol("increment") =>
counter += 1
case Symbol("get") =>
sender() ! counter
case Symbol("error") =>
throw new RuntimeException("BOOM!")
case Symbol("prerestart") =>
restarted.success(true)
}

}
}

class ActorInstrumentedLifeCycleSpec extends TestKit(NonLoggingActorSystem) with AsyncFunSpecLike with ImplicitSender with Matchers with ScalaFutures with BeforeAndAfterAll {

def report: Option[Any] = {
case class NameFilter(prefix: String) extends MetricFilter {
override def matches(name: String, metric: Metric): Boolean = name.startsWith(prefix)
}

TestMetricRegistry
.getGauges(NameFilter("nl.grons.metrics4.scala")).asScala
.headOption
.map { case (_, g) => g.getValue }
}

describe("an actor with gauges needing lifecycle management") {
val restartedPromise = Promise[Boolean]()
val ar = system.actorOf(Props(new ExampleActor(restartedPromise)))

it("correctly builds a gauge that reports the correct value") {
ar ! Symbol("increment")
ar ! Symbol("increment")
ar ! Symbol("increment")
ar ! Symbol("get")
expectMsg(3)
report shouldBe Some(3)
}

it("rebuilds the gauge on actor restart") {
ar ! Symbol("increment")
ar ! Symbol("increment")
ar ! Symbol("error")
ar ! Symbol("increment")
ar ! Symbol("get")
expectMsg(1)
whenReady(restartedPromise.future)(_ shouldBe true)
report shouldBe Some(1)
}
}

override def afterAll(): Unit = {
TestKit.shutdownActorSystem(system)
}
}
Loading