Skip to content

Commit

Permalink
Warn about bad rows even in DummyStream sink (close #64)
Browse files Browse the repository at this point in the history
  • Loading branch information
istreeter committed Oct 30, 2021
1 parent bdb073f commit 96b9445
Show file tree
Hide file tree
Showing 9 changed files with 44 additions and 10 deletions.
3 changes: 3 additions & 0 deletions config/config.kinesis.reference.hocon
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,9 @@
# Max size of the batch in bytes before emitting
# Default is 5MB
"maxBatchBytes": 5000000

# Only used when "type" is "Noop" or missing. How often to log number of bad rows discarded.
"reportPeriod": 10 seconds
}
}

Expand Down
3 changes: 3 additions & 0 deletions config/config.local.reference.hocon
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@
"type": "Local"
# Path for bad row sink.
"path": "./tmp/bad"

# Only used when "type" is "Noop" or missing. How often to log number of bad rows discarded.
"reportPeriod": 10 seconds
}
}

Expand Down
3 changes: 3 additions & 0 deletions config/config.pubsub.reference.hocon
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,9 @@
# The number of threads used internally by library to process the callback after message delivery
# Default is 1
"numCallbackExecutors": 1

# Only used when "type" is "Noop" or missing. How often to log number of bad rows discarded.
"reportPeriod": 10 seconds
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,33 @@
*/
package com.snowplowanalytics.snowplow.postgres.streaming

import cats.effect.{Resource, Concurrent}
import org.log4s.getLogger

import cats.effect.{Resource, Concurrent, Sync, Timer}
import cats.effect.concurrent.Ref

import fs2.Stream

import scala.concurrent.duration.FiniteDuration

object DummyStreamSink {
def create[F[_]: Concurrent]:Resource[F, StreamSink[F]] =
Resource.pure[F, StreamSink[F]](_ => Concurrent[F].pure(()))
def create[F[_]: Concurrent: Timer](period: FiniteDuration): Resource[F, StreamSink[F]] =
for {
counter <- Resource.eval(Ref.of(0))
_ <- Resource.make(Concurrent[F].start(reporter(counter, period)))(_.cancel)
} yield { _ =>
counter.update(_ + 1)
}

lazy val logger = getLogger

private def reporter[F[_]: Sync: Timer](counter: Ref[F, Int], period: FiniteDuration): F[Unit] =
Stream.awakeDelay[F](period)
.evalMap(_ => counter.getAndSet(0))
.evalMap { count =>
if (count > 0) Sync[F].delay(logger.info(s"Discarded $count bad rows"))
else Sync[F].unit
}
.compile
.drain
}
1 change: 1 addition & 0 deletions modules/loader/src/main/resources/application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
}
"bad": {
"type": "Noop"
"reportPeriod": 30 seconds
"delayThreshold": 200 milliseconds
"maxBatchSize": 500
"maxBatchBytes": 5000000
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ object LoaderConfig {
sealed trait StreamSink extends Product with Serializable
object StreamSink {

case object Noop extends StreamSink
case class Noop(reportPeriod: FiniteDuration) extends StreamSink

case class Local(path: PathInfo) extends StreamSink

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ object Environment {
blocker: Blocker) =
for {
badSink <- config.output.bad match {
case LoaderConfig.StreamSink.Noop => DummyStreamSink.create
case LoaderConfig.StreamSink.Noop(period) => DummyStreamSink.create(period)
case c: LoaderConfig.StreamSink.Kinesis => KinesisSink.create(c, config.monitoring, config.backoffPolicy, blocker)
case c: LoaderConfig.StreamSink.PubSub => PubSubSink.create(c, config.backoffPolicy)
case c: LoaderConfig.StreamSink.Local => LocalSink.create(c, blocker)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ class CliSpec extends Specification {
10,
None
),
LoaderConfig.StreamSink.Noop
LoaderConfig.StreamSink.Noop(30.seconds)
),
Purpose.Enriched,
Monitoring(Monitoring.Metrics(true)),
Expand Down Expand Up @@ -136,7 +136,7 @@ class CliSpec extends Specification {
10,
None
),
LoaderConfig.StreamSink.Noop
LoaderConfig.StreamSink.Noop(30.seconds)
),
Purpose.Enriched,
Monitoring(Monitoring.Metrics(true)),
Expand Down Expand Up @@ -209,7 +209,7 @@ class CliSpec extends Specification {
10,
None
),
LoaderConfig.StreamSink.Noop
LoaderConfig.StreamSink.Noop(30.seconds)
),
Purpose.Enriched,
Monitoring(Monitoring.Metrics(true)),
Expand Down Expand Up @@ -271,7 +271,7 @@ class CliSpec extends Specification {
10,
None
),
LoaderConfig.StreamSink.Noop
LoaderConfig.StreamSink.Noop(30.seconds)
),
Purpose.Enriched,
Monitoring(Monitoring.Metrics(true)),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ class LocalSourceSpec extends Database {
10,
None
),
LoaderConfig.StreamSink.Noop
LoaderConfig.StreamSink.Noop(30.seconds)
),
Purpose.Enriched,
Monitoring(Monitoring.Metrics(false)),
Expand Down

0 comments on commit 96b9445

Please sign in to comment.