From 798b621bfd3e94ae66c28ba615b1b09da7fac5dc Mon Sep 17 00:00:00 2001 From: Ian Streeter Date: Tue, 24 Oct 2023 20:21:25 +0100 Subject: [PATCH] Add latency metric --- .../Metrics.scala | 17 ++++++++-- .../processing/Processing.scala | 19 +++++++++-- .../MockEnvironment.scala | 4 +++ .../processing/ProcessingSpec.scala | 33 +++++++++++++++++++ project/Dependencies.scala | 9 +++-- 5 files changed, 74 insertions(+), 8 deletions(-) diff --git a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/Metrics.scala b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/Metrics.scala index d811b77..8efa9d6 100644 --- a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/Metrics.scala +++ b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/Metrics.scala @@ -17,6 +17,7 @@ import com.snowplowanalytics.snowplow.runtime.{Metrics => CommonMetrics} trait Metrics[F[_]] { def addGood(count: Int): F[Unit] def addBad(count: Int): F[Unit] + def setLatencyMillis(latencyMillis: Long): F[Unit] def report: Stream[F, Nothing] } @@ -28,17 +29,19 @@ object Metrics { private case class State( good: Int, - bad: Int + bad: Int, + latencyMillis: Long ) extends CommonMetrics.State { def toKVMetrics: List[CommonMetrics.KVMetric] = List( KVMetric.CountGood(good), - KVMetric.CountBad(bad) + KVMetric.CountBad(bad), + KVMetric.LatencyMillis(latencyMillis) ) } private object State { - def empty: State = State(0, 0) + def empty: State = State(0, 0, 0L) } private def impl[F[_]: Async](config: Config.Metrics, ref: Ref[F, State]): Metrics[F] = @@ -47,6 +50,8 @@ object Metrics { ref.update(s => s.copy(good = s.good + count)) def addBad(count: Int): F[Unit] = ref.update(s => s.copy(bad = s.bad + count)) + def setLatencyMillis(latencyMillis: Long): F[Unit] = + ref.update(s => s.copy(latencyMillis = s.latencyMillis.max(latencyMillis))) } private object KVMetric { @@ -63,5 +68,11 @@ object Metrics { val metricType = CommonMetrics.MetricType.Count } + final case class LatencyMillis(v: Long) extends CommonMetrics.KVMetric { + val key = "latency_millis" + val value = v.toString + val metricType = CommonMetrics.MetricType.Gauge + } + } } diff --git a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/processing/Processing.scala b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/processing/Processing.scala index 1f4d4e7..aa65f67 100644 --- a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/processing/Processing.scala +++ b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/processing/Processing.scala @@ -25,7 +25,7 @@ import com.snowplowanalytics.snowplow.analytics.scalasdk.Event import com.snowplowanalytics.snowplow.badrows.{BadRow, Payload => BadPayload, Processor => BadRowProcessor} import com.snowplowanalytics.snowplow.badrows.Payload.{RawPayload => BadRowRawPayload} import com.snowplowanalytics.snowplow.sources.{EventProcessingConfig, EventProcessor, TokenedEvents} -import com.snowplowanalytics.snowplow.snowflake.{Config, Environment} +import com.snowplowanalytics.snowplow.snowflake.{Config, Environment, Metrics} import com.snowplowanalytics.snowplow.loaders.transform.Transform object Processing { @@ -107,7 +107,8 @@ object Processing { ): EventProcessor[F] = { in => val badProcessor = BadRowProcessor(env.appInfo.name, env.appInfo.version) - in.through(parseBytes(badProcessor)) + in.through(setLatency(env.metrics)) + .through(parseBytes(badProcessor)) .through(transform(badProcessor)) .through(batchUp(env.batching)) .through(writeToSnowflake(env, badProcessor)) @@ -116,6 +117,20 @@ object Processing { .through(emitTokens) } + private def setLatency[F[_]: Sync](metrics: Metrics[F]): Pipe[F, TokenedEvents, TokenedEvents] = + _.evalTap { + _.earliestSourceTstamp match { + case Some(t) => + for { + now <- Sync[F].realTime + latencyMillis = now.toMillis - t.toEpochMilli + _ <- metrics.setLatencyMillis(latencyMillis) + } yield () + case None => + Applicative[F].unit + } + } + /** Parse raw bytes into Event using analytics sdk */ private def parseBytes[F[_]: Monad](badProcessor: BadRowProcessor): Pipe[F, TokenedEvents, ParsedBatch] = _.evalMap { case TokenedEvents(list, token, _) => diff --git a/modules/core/src/test/scala/com.snowplowanalytics.snowplow.snowflake/MockEnvironment.scala b/modules/core/src/test/scala/com.snowplowanalytics.snowplow.snowflake/MockEnvironment.scala index 8ad2572..c5d773c 100644 --- a/modules/core/src/test/scala/com.snowplowanalytics.snowplow.snowflake/MockEnvironment.scala +++ b/modules/core/src/test/scala/com.snowplowanalytics.snowplow.snowflake/MockEnvironment.scala @@ -33,6 +33,7 @@ object MockEnvironment { case class WroteRowsToSnowflake(rowCount: Int) extends Action case class AddedGoodCountMetric(count: Int) extends Action case class AddedBadCountMetric(count: Int) extends Action + case class SetLatencyMetric(millis: Long) extends Action } import Action._ @@ -151,6 +152,9 @@ object MockEnvironment { def addGood(count: Int): IO[Unit] = ref.update(_ :+ AddedGoodCountMetric(count)) + def setLatencyMillis(latencyMillis: Long): IO[Unit] = + ref.update(_ :+ SetLatencyMetric(latencyMillis)) + def report: Stream[IO, Nothing] = Stream.never[IO] } } diff --git a/modules/core/src/test/scala/com.snowplowanalytics.snowplow.snowflake/processing/ProcessingSpec.scala b/modules/core/src/test/scala/com.snowplowanalytics.snowplow.snowflake/processing/ProcessingSpec.scala index 79b7eaf..4ba7599 100644 --- a/modules/core/src/test/scala/com.snowplowanalytics.snowplow.snowflake/processing/ProcessingSpec.scala +++ b/modules/core/src/test/scala/com.snowplowanalytics.snowplow.snowflake/processing/ProcessingSpec.scala @@ -11,10 +11,13 @@ import cats.effect.IO import fs2.Stream import org.specs2.Specification import cats.effect.testing.specs2.CatsEffect +import cats.effect.testkit.TestControl import net.snowflake.ingest.utils.{ErrorCode, SFException} import java.nio.charset.StandardCharsets import java.nio.ByteBuffer +import java.time.Instant +import scala.concurrent.duration.DurationLong import com.snowplowanalytics.snowplow.analytics.scalasdk.Event import com.snowplowanalytics.snowplow.snowflake.MockEnvironment @@ -33,6 +36,7 @@ class ProcessingSpec extends Specification with CatsEffect { Emit BadRows when the ChannelProvider reports a problem with the data $e5 Abort processing and don't ack events when the ChannelProvider reports a runtime error $e6 Reset the Channel when the ChannelProvider reports the channel has become invalid $e7 + Set the latency metric based off the message timestamp $e8 """ def e1 = @@ -185,6 +189,35 @@ class ProcessingSpec extends Specification with CatsEffect { ) } + def e8 = { + val messageTime = Instant.parse("2023-10-24T10:00:00.000Z") + val processTime = Instant.parse("2023-10-24T10:00:42.123Z") + + val io = for { + inputs <- generateEvents.take(2).compile.toList.map { + _.map { + _.copy(earliestSourceTstamp = Some(messageTime)) + } + } + control <- MockEnvironment.build(inputs) + _ <- IO.sleep(processTime.toEpochMilli.millis) + _ <- Processing.stream(control.environment).compile.drain + state <- control.state.get + } yield state should beEqualTo( + Vector( + Action.SetLatencyMetric(42123), + Action.SetLatencyMetric(42123), + Action.WroteRowsToSnowflake(4), + Action.AddedGoodCountMetric(4), + Action.AddedBadCountMetric(0), + Action.Checkpointed(List(inputs(0).ack, inputs(1).ack)) + ) + ) + + TestControl.executeEmbed(io) + + } + } object ProcessingSpec { diff --git a/project/Dependencies.scala b/project/Dependencies.scala index b715438..27b2c37 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -11,6 +11,7 @@ object Dependencies { object V { // Scala + val catsEffect = "3.5.0" val catsRetry = "3.1.0" val http4s = "0.23.15" val decline = "2.4.1" @@ -27,7 +28,7 @@ object Dependencies { val awsSdk2 = "2.20.135" // Snowplow - val streams = "0.1.0-M6" + val streams = "0.1.0-M7" // tests val specs2 = "4.20.0" @@ -58,8 +59,9 @@ object Dependencies { val runtime = "com.snowplowanalytics" %% "runtime-common" % V.streams // tests - val specs2 = "org.specs2" %% "specs2-core" % V.specs2 % Test - val catsEffectSpecs2 = "org.typelevel" %% "cats-effect-testing-specs2" % V.catsEffectSpecs2 % Test + val specs2 = "org.specs2" %% "specs2-core" % V.specs2 % Test + val catsEffectSpecs2 = "org.typelevel" %% "cats-effect-testing-specs2" % V.catsEffectSpecs2 % Test + val catsEffectTestkit = "org.typelevel" %% "cats-effect-testkit" % V.catsEffect % Test val coreDependencies = Seq( streamsCore, @@ -74,6 +76,7 @@ object Dependencies { circeGenericExtra, specs2, catsEffectSpecs2, + catsEffectTestkit, slf4j % Test )