Skip to content

Commit

Permalink
Use new badrows serializer checking max record size (close #16)
Browse files Browse the repository at this point in the history
  • Loading branch information
pondzix authored and istreeter committed Feb 18, 2024
1 parent 7ac5bfe commit ceb4653
Show file tree
Hide file tree
Showing 5 changed files with 31 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,14 @@ case class Config[+Source, +Sink](

object Config {

case class Output[+Sink](good: Snowflake, bad: Sink)
case class Output[+Sink](
good: Snowflake,
bad: SinkWithMaxSize[Sink]
)

case class MaxRecordSize(maxRecordSize: Int)

case class SinkWithMaxSize[+Sink](sink: Sink, maxRecordSize: Int)

case class Snowflake(
url: SnowflakeURL,
Expand Down Expand Up @@ -92,8 +99,12 @@ object Config {
Try(new SnowflakeURL(str))
}
implicit val snowflake = deriveConfiguredDecoder[Snowflake]
implicit val output = deriveConfiguredDecoder[Output[Sink]]
implicit val batching = deriveConfiguredDecoder[Batching]
implicit val sinkWithMaxSize = for {
sink <- Decoder[Sink]
maxSize <- deriveConfiguredDecoder[MaxRecordSize]
} yield SinkWithMaxSize(sink, maxSize.maxRecordSize)
implicit val output = deriveConfiguredDecoder[Output[Sink]]
implicit val batching = deriveConfiguredDecoder[Batching]
implicit val sentryDecoder = deriveConfiguredDecoder[SentryM[Option]]
.map[Option[Sentry]] {
case SentryM(Some(dsn), tags) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ case class Environment[F[_]](
channel: Coldswap[F, Channel[F]],
metrics: Metrics[F],
batching: Config.Batching,
schemasToSkip: List[SchemaCriterion]
schemasToSkip: List[SchemaCriterion],
badRowMaxSize: Int
)

object Environment {
Expand All @@ -50,7 +51,7 @@ object Environment {
)
httpClient <- BlazeClientBuilder[F].withExecutionContext(global.compute).resource
monitoring <- Monitoring.create[F](config.monitoring.webhook, appInfo, httpClient)
badSink <- toSink(config.output.bad)
badSink <- toSink(config.output.bad.sink)
metrics <- Resource.eval(Metrics.build(config.monitoring.metrics))
tableManager <- Resource.eval(TableManager.make(config.output.good, snowflakeHealth, config.retries, monitoring))
channelResource <- Channel.make(config.output.good, snowflakeHealth, config.batching, config.retries, monitoring)
Expand All @@ -64,6 +65,7 @@ object Environment {
channel = channelColdswap,
metrics = metrics,
batching = config.batching,
schemasToSkip = config.skipSchemas
schemasToSkip = config.skipSchemas,
badRowMaxSize = config.output.bad.maxRecordSize
)
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import com.snowplowanalytics.snowplow.sinks.ListOfList
import com.snowplowanalytics.snowplow.snowflake.{Environment, Metrics}
import com.snowplowanalytics.snowplow.runtime.syntax.foldable._
import com.snowplowanalytics.snowplow.runtime.processing.BatchUp
import com.snowplowanalytics.snowplow.loaders.transform.Transform
import com.snowplowanalytics.snowplow.loaders.transform.{BadRowsSerializer, Transform}

object Processing {

Expand Down Expand Up @@ -130,7 +130,7 @@ object Processing {
.through(transform(badProcessor, env.schemasToSkip))
.through(BatchUp.withTimeout(env.batching.maxBytes, env.batching.maxDelay))
.through(writeToSnowflake(env, badProcessor))
.through(sendFailedEvents(env))
.through(sendFailedEvents(env, badProcessor))
.through(sendMetrics(env))
.through(emitTokens)
}
Expand Down Expand Up @@ -347,15 +347,19 @@ object Processing {
env.tableManager.addColumns(extraColsRequired.toList)
}

private def sendFailedEvents[F[_]: Applicative, A](env: Environment[F]): Pipe[F, BatchAfterTransform, BatchAfterTransform] =
private def sendFailedEvents[F[_]: Applicative](
env: Environment[F],
badRowProcessor: BadRowProcessor
): Pipe[F, BatchAfterTransform, BatchAfterTransform] =
_.evalTap { batch =>
if (batch.badAccumulated.nonEmpty) {
val serialized = batch.badAccumulated.mapUnordered(_.compactByteArray)
val serialized =
batch.badAccumulated.mapUnordered(badRow => BadRowsSerializer.withMaxSize(badRow, badRowProcessor, env.badRowMaxSize))
env.badSink.sinkSimple(serialized)
} else Applicative[F].unit
}

private def sendMetrics[F[_]: Applicative, A](env: Environment[F]): Pipe[F, BatchAfterTransform, BatchAfterTransform] =
private def sendMetrics[F[_]: Applicative](env: Environment[F]): Pipe[F, BatchAfterTransform, BatchAfterTransform] =
_.evalTap { batch =>
val countBad = batch.badAccumulated.asIterable.size
env.metrics.addGood(batch.origBatchCount - countBad) *> env.metrics.addBad(countBad)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,8 @@ object MockEnvironment {
maxDelay = 10.seconds,
uploadConcurrency = 1
),
schemasToSkip = List.empty
schemasToSkip = List.empty,
badRowMaxSize = 1000000
)
MockEnvironment(state, env)
}
Expand Down
2 changes: 1 addition & 1 deletion project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ object Dependencies {
val awsSdk2 = "2.20.135"

// Snowplow
val streams = "0.2.0"
val streams = "0.3.0-M2"

// tests
val specs2 = "4.20.0"
Expand Down

0 comments on commit ceb4653

Please sign in to comment.