diff --git a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/AppHealth.scala b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/AppHealth.scala index 236d0bc..86d8a0b 100644 --- a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/AppHealth.scala +++ b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/AppHealth.scala @@ -15,16 +15,14 @@ object AppHealth { snowflakeHealth: SnowflakeHealth[F] ): F[HealthProbe.Status] = List( - latencyHealth(config, source), + sourceIsHealthy(config, source), snowflakeHealth.state.get ).foldA - private def latencyHealth[F[_]: Functor](config: Config.HealthProbe, source: SourceAndAck[F]): F[HealthProbe.Status] = - source.processingLatency.map { latency => - if (latency > config.unhealthyLatency) - Unhealthy(show"Processing latency is $latency") - else - Healthy + private def sourceIsHealthy[F[_]: Functor](config: Config.HealthProbe, source: SourceAndAck[F]): F[HealthProbe.Status] = + source.isHealthy(config.unhealthyLatency).map { + case SourceAndAck.Healthy => HealthProbe.Healthy + case unhealthy: SourceAndAck.Unhealthy => HealthProbe.Unhealthy(unhealthy.show) } private val combineHealth: (HealthProbe.Status, HealthProbe.Status) => HealthProbe.Status = { diff --git a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/processing/Processing.scala b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/processing/Processing.scala index e15b066..ca795e3 100644 --- a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/processing/Processing.scala +++ b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/processing/Processing.scala @@ -155,9 +155,8 @@ object Processing { numBytes <- Sync[F].delay(Foldable[Chunk].sumBytes(chunk)) (badRows, events) <- Foldable[Chunk].traverseSeparateUnordered(chunk) { bytes => Sync[F].delay { - val stringified = StandardCharsets.UTF_8.decode(bytes).toString - Event.parse(stringified).toEither.leftMap { case failure => - val payload = BadRowRawPayload(stringified) + Event.parseBytes(bytes).toEither.leftMap { failure => + val payload = BadRowRawPayload(StandardCharsets.UTF_8.decode(bytes).toString) BadRow.LoaderParsingError(badProcessor, failure, payload) } } diff --git a/modules/core/src/test/scala/com.snowplowanalytics.snowplow.snowflake/MockEnvironment.scala b/modules/core/src/test/scala/com.snowplowanalytics.snowplow.snowflake/MockEnvironment.scala index 989edbf..ba8d841 100644 --- a/modules/core/src/test/scala/com.snowplowanalytics.snowplow.snowflake/MockEnvironment.scala +++ b/modules/core/src/test/scala/com.snowplowanalytics.snowplow.snowflake/MockEnvironment.scala @@ -17,7 +17,7 @@ import com.snowplowanalytics.snowplow.sinks.Sink import com.snowplowanalytics.snowplow.snowflake.processing.{ChannelProvider, TableManager} import com.snowplowanalytics.snowplow.runtime.AppInfo -import scala.concurrent.duration.{Duration, DurationInt, FiniteDuration} +import scala.concurrent.duration.{DurationInt, FiniteDuration} case class MockEnvironment(state: Ref[IO, Vector[MockEnvironment.Action]], environment: Environment[IO]) @@ -94,7 +94,8 @@ object MockEnvironment { } .drain - def processingLatency: IO[FiniteDuration] = IO.pure(Duration.Zero) + override def isHealthy(maxAllowedProcessingLatency: FiniteDuration): IO[SourceAndAck.HealthStatus] = + IO.pure(SourceAndAck.Healthy) } private def testSink(ref: Ref[IO, Vector[Action]]): Sink[IO] = Sink[IO] { batch => diff --git a/project/Dependencies.scala b/project/Dependencies.scala index c8f86a0..70fdfb2 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -28,7 +28,7 @@ object Dependencies { val awsSdk2 = "2.20.135" // Snowplow - val streams = "0.2.0-M2" + val streams = "0.2.0" // tests val specs2 = "4.20.0"