Skip to content

Commit

Permalink
Parse events from bytes instead of string (close #10)
Browse files Browse the repository at this point in the history
* New `Event.parseBytes` method from analytics-sdk to parse events from bytes directly.
* New `_schema_version` metafield for contexts
  • Loading branch information
pondzix committed Dec 12, 2023
1 parent ceb01e5 commit ece3d95
Show file tree
Hide file tree
Showing 4 changed files with 11 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,14 @@ object AppHealth {
snowflakeHealth: SnowflakeHealth[F]
): F[HealthProbe.Status] =
List(
latencyHealth(config, source),
sourceIsHealthy(config, source),
snowflakeHealth.state.get
).foldA

private def latencyHealth[F[_]: Functor](config: Config.HealthProbe, source: SourceAndAck[F]): F[HealthProbe.Status] =
source.processingLatency.map { latency =>
if (latency > config.unhealthyLatency)
Unhealthy(show"Processing latency is $latency")
else
Healthy
private def sourceIsHealthy[F[_]: Functor](config: Config.HealthProbe, source: SourceAndAck[F]): F[HealthProbe.Status] =
source.isHealthy(config.unhealthyLatency).map {
case SourceAndAck.Healthy => HealthProbe.Healthy
case unhealthy: SourceAndAck.Unhealthy => HealthProbe.Unhealthy(unhealthy.show)
}

private val combineHealth: (HealthProbe.Status, HealthProbe.Status) => HealthProbe.Status = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,9 +155,8 @@ object Processing {
numBytes <- Sync[F].delay(Foldable[Chunk].sumBytes(chunk))
(badRows, events) <- Foldable[Chunk].traverseSeparateUnordered(chunk) { bytes =>
Sync[F].delay {
val stringified = StandardCharsets.UTF_8.decode(bytes).toString
Event.parse(stringified).toEither.leftMap { case failure =>
val payload = BadRowRawPayload(stringified)
Event.parseBytes(bytes).toEither.leftMap { failure =>
val payload = BadRowRawPayload(StandardCharsets.UTF_8.decode(bytes).toString)
BadRow.LoaderParsingError(badProcessor, failure, payload)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import com.snowplowanalytics.snowplow.sinks.Sink
import com.snowplowanalytics.snowplow.snowflake.processing.{ChannelProvider, TableManager}
import com.snowplowanalytics.snowplow.runtime.AppInfo

import scala.concurrent.duration.{Duration, DurationInt, FiniteDuration}
import scala.concurrent.duration.{DurationInt, FiniteDuration}

case class MockEnvironment(state: Ref[IO, Vector[MockEnvironment.Action]], environment: Environment[IO])

Expand Down Expand Up @@ -94,7 +94,8 @@ object MockEnvironment {
}
.drain

def processingLatency: IO[FiniteDuration] = IO.pure(Duration.Zero)
override def isHealthy(maxAllowedProcessingLatency: FiniteDuration): IO[SourceAndAck.HealthStatus] =
IO.pure(SourceAndAck.Healthy)
}

private def testSink(ref: Ref[IO, Vector[Action]]): Sink[IO] = Sink[IO] { batch =>
Expand Down
2 changes: 1 addition & 1 deletion project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ object Dependencies {
val awsSdk2 = "2.20.135"

// Snowplow
val streams = "0.2.0-M2"
val streams = "0.2.0"

// tests
val specs2 = "4.20.0"
Expand Down

0 comments on commit ece3d95

Please sign in to comment.