diff --git a/config/config.kinesis.reference.hocon b/config/config.kinesis.reference.hocon index 686bf60..c327bad 100644 --- a/config/config.kinesis.reference.hocon +++ b/config/config.kinesis.reference.hocon @@ -74,6 +74,9 @@ # Max size of the batch in bytes before emitting # Default is 5MB "maxBatchBytes": 5000000 + + # Only used when "type" is "Noop" or missing. How often to log number of bad rows discarded. + "reportPeriod": 10 seconds } } diff --git a/config/config.local.reference.hocon b/config/config.local.reference.hocon index a13c988..a004937 100644 --- a/config/config.local.reference.hocon +++ b/config/config.local.reference.hocon @@ -42,6 +42,9 @@ "type": "Local" # Path for bad row sink. "path": "./tmp/bad" + + # Only used when "type" is "Noop" or missing. How often to log number of bad rows discarded. + "reportPeriod": 10 seconds } } diff --git a/config/config.pubsub.reference.hocon b/config/config.pubsub.reference.hocon index d8bc6f8..1cdf90e 100644 --- a/config/config.pubsub.reference.hocon +++ b/config/config.pubsub.reference.hocon @@ -63,6 +63,9 @@ # The number of threads used internally by library to process the callback after message delivery # Default is 1 "numCallbackExecutors": 1 + + # Only used when "type" is "Noop" or missing. How often to log number of bad rows discarded. + "reportPeriod": 10 seconds } } diff --git a/modules/common/src/main/scala/com/snowplowanalytics/snowplow/postgres/streaming/DummyStreamSink.scala b/modules/common/src/main/scala/com/snowplowanalytics/snowplow/postgres/streaming/DummyStreamSink.scala index 79292e7..d63376d 100644 --- a/modules/common/src/main/scala/com/snowplowanalytics/snowplow/postgres/streaming/DummyStreamSink.scala +++ b/modules/common/src/main/scala/com/snowplowanalytics/snowplow/postgres/streaming/DummyStreamSink.scala @@ -12,9 +12,33 @@ */ package com.snowplowanalytics.snowplow.postgres.streaming -import cats.effect.{Resource, Concurrent} +import org.log4s.getLogger + +import cats.effect.{Resource, Concurrent, Sync, Timer} +import cats.effect.concurrent.Ref + +import fs2.Stream + +import scala.concurrent.duration.FiniteDuration object DummyStreamSink { - def create[F[_]: Concurrent]:Resource[F, StreamSink[F]] = - Resource.pure[F, StreamSink[F]](_ => Concurrent[F].pure(())) + def create[F[_]: Concurrent: Timer](period: FiniteDuration): Resource[F, StreamSink[F]] = + for { + counter <- Resource.eval(Ref.of(0)) + _ <- Resource.make(Concurrent[F].start(reporter(counter, period)))(_.cancel) + } yield { _ => + counter.update(_ + 1) + } + + lazy val logger = getLogger + + private def reporter[F[_]: Sync: Timer](counter: Ref[F, Int], period: FiniteDuration): F[Unit] = + Stream.awakeDelay[F](period) + .evalMap(_ => counter.getAndSet(0)) + .evalMap { count => + if (count > 0) Sync[F].delay(logger.info(s"Discarded $count bad rows")) + else Sync[F].unit + } + .compile + .drain } diff --git a/modules/loader/src/main/resources/application.conf b/modules/loader/src/main/resources/application.conf index 8f6ed7d..1a684c6 100644 --- a/modules/loader/src/main/resources/application.conf +++ b/modules/loader/src/main/resources/application.conf @@ -22,6 +22,7 @@ } "bad": { "type": "Noop" + "reportPeriod": 30 seconds "delayThreshold": 200 milliseconds "maxBatchSize": 500 "maxBatchBytes": 5000000 diff --git a/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/postgres/config/LoaderConfig.scala b/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/postgres/config/LoaderConfig.scala index 987df48..a7a5644 100644 --- a/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/postgres/config/LoaderConfig.scala +++ b/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/postgres/config/LoaderConfig.scala @@ -234,7 +234,7 @@ object LoaderConfig { sealed trait StreamSink extends Product with Serializable object StreamSink { - case object Noop extends StreamSink + case class Noop(reportPeriod: FiniteDuration) extends StreamSink case class Local(path: PathInfo) extends StreamSink diff --git a/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/postgres/env/Environment.scala b/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/postgres/env/Environment.scala index 433ef67..60020ce 100644 --- a/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/postgres/env/Environment.scala +++ b/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/postgres/env/Environment.scala @@ -42,7 +42,7 @@ object Environment { blocker: Blocker) = for { badSink <- config.output.bad match { - case LoaderConfig.StreamSink.Noop => DummyStreamSink.create + case LoaderConfig.StreamSink.Noop(period) => DummyStreamSink.create(period) case c: LoaderConfig.StreamSink.Kinesis => KinesisSink.create(c, config.monitoring, config.backoffPolicy, blocker) case c: LoaderConfig.StreamSink.PubSub => PubSubSink.create(c, config.backoffPolicy) case c: LoaderConfig.StreamSink.Local => LocalSink.create(c, blocker) diff --git a/modules/loader/src/test/scala/com.snowplowanalytics.snowplow.postgres/config/CliSpec.scala b/modules/loader/src/test/scala/com.snowplowanalytics.snowplow.postgres/config/CliSpec.scala index eaeee7f..120a1c4 100644 --- a/modules/loader/src/test/scala/com.snowplowanalytics.snowplow.postgres/config/CliSpec.scala +++ b/modules/loader/src/test/scala/com.snowplowanalytics.snowplow.postgres/config/CliSpec.scala @@ -57,7 +57,7 @@ class CliSpec extends Specification { 10, None ), - LoaderConfig.StreamSink.Noop + LoaderConfig.StreamSink.Noop(30.seconds) ), Purpose.Enriched, Monitoring(Monitoring.Metrics(true)), @@ -136,7 +136,7 @@ class CliSpec extends Specification { 10, None ), - LoaderConfig.StreamSink.Noop + LoaderConfig.StreamSink.Noop(30.seconds) ), Purpose.Enriched, Monitoring(Monitoring.Metrics(true)), @@ -209,7 +209,7 @@ class CliSpec extends Specification { 10, None ), - LoaderConfig.StreamSink.Noop + LoaderConfig.StreamSink.Noop(30.seconds) ), Purpose.Enriched, Monitoring(Monitoring.Metrics(true)), @@ -271,7 +271,7 @@ class CliSpec extends Specification { 10, None ), - LoaderConfig.StreamSink.Noop + LoaderConfig.StreamSink.Noop(30.seconds) ), Purpose.Enriched, Monitoring(Monitoring.Metrics(true)), diff --git a/modules/loader/src/test/scala/com.snowplowanalytics.snowplow.postgres/config/LocalSourceSpec.scala b/modules/loader/src/test/scala/com.snowplowanalytics.snowplow.postgres/config/LocalSourceSpec.scala index 8116078..41426d6 100644 --- a/modules/loader/src/test/scala/com.snowplowanalytics.snowplow.postgres/config/LocalSourceSpec.scala +++ b/modules/loader/src/test/scala/com.snowplowanalytics.snowplow.postgres/config/LocalSourceSpec.scala @@ -57,7 +57,7 @@ class LocalSourceSpec extends Database { 10, None ), - LoaderConfig.StreamSink.Noop + LoaderConfig.StreamSink.Noop(30.seconds) ), Purpose.Enriched, Monitoring(Monitoring.Metrics(false)),