From 418f5edd1c5b366a9eb48fee47a67ec39a48d414 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20Poniedzia=C5=82ek?= Date: Mon, 1 Jan 2024 12:16:28 +0100 Subject: [PATCH] Use new badrows serializer checking max record size (close #16) --- .../Config.scala | 17 ++++++++++++++--- .../Environment.scala | 8 +++++--- .../processing/Processing.scala | 14 +++++++++----- .../MockEnvironment.scala | 3 ++- project/Dependencies.scala | 2 +- 5 files changed, 31 insertions(+), 13 deletions(-) diff --git a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/Config.scala b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/Config.scala index 4c97c9a..dacc1bc 100644 --- a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/Config.scala +++ b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/Config.scala @@ -36,7 +36,14 @@ case class Config[+Source, +Sink]( object Config { - case class Output[+Sink](good: Snowflake, bad: Sink) + case class Output[+Sink]( + good: Snowflake, + bad: SinkWithMaxSize[Sink] + ) + + case class MaxRecordSize(maxRecordSize: Int) + + case class SinkWithMaxSize[+Sink](sink: Sink, maxRecordSize: Int) case class Snowflake( url: SnowflakeURL, @@ -89,8 +96,12 @@ object Config { Try(new SnowflakeURL(str)) } implicit val snowflake = deriveConfiguredDecoder[Snowflake] - implicit val output = deriveConfiguredDecoder[Output[Sink]] - implicit val batching = deriveConfiguredDecoder[Batching] + implicit val sinkWithMaxSize = for { + sink <- Decoder[Sink] + maxSize <- deriveConfiguredDecoder[MaxRecordSize] + } yield SinkWithMaxSize(sink, maxSize.maxRecordSize) + implicit val output = deriveConfiguredDecoder[Output[Sink]] + implicit val batching = deriveConfiguredDecoder[Batching] implicit val sentryDecoder = deriveConfiguredDecoder[SentryM[Option]] .map[Option[Sentry]] { case SentryM(Some(dsn), tags) => diff --git a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/Environment.scala b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/Environment.scala index 682fa1e..4b81a03 100644 --- a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/Environment.scala +++ b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/Environment.scala @@ -26,7 +26,8 @@ case class Environment[F[_]]( channel: Coldswap[F, Channel[F]], metrics: Metrics[F], batching: Config.Batching, - schemasToSkip: List[SchemaCriterion] + schemasToSkip: List[SchemaCriterion], + badRowMaxSize: Int ) object Environment { @@ -47,7 +48,7 @@ object Environment { ) httpClient <- BlazeClientBuilder[F].withExecutionContext(global.compute).resource monitoring <- Monitoring.create[F](config.monitoring.webhook, appInfo, httpClient) - badSink <- toSink(config.output.bad) + badSink <- toSink(config.output.bad.sink) metrics <- Resource.eval(Metrics.build(config.monitoring.metrics)) tableManager <- Resource.eval(TableManager.make(config.output.good, snowflakeHealth, config.retries, monitoring)) channelResource <- Channel.make(config.output.good, snowflakeHealth, config.batching, config.retries, monitoring) @@ -61,6 +62,7 @@ object Environment { channel = channelColdswap, metrics = metrics, batching = config.batching, - schemasToSkip = config.skipSchemas + schemasToSkip = config.skipSchemas, + badRowMaxSize = config.output.bad.maxRecordSize ) } diff --git a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/processing/Processing.scala b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/processing/Processing.scala index e62f3ef..496716b 100644 --- a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/processing/Processing.scala +++ b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/processing/Processing.scala @@ -28,7 +28,7 @@ import com.snowplowanalytics.snowplow.sinks.ListOfList import com.snowplowanalytics.snowplow.snowflake.{Environment, Metrics} import com.snowplowanalytics.snowplow.runtime.syntax.foldable._ import com.snowplowanalytics.snowplow.runtime.processing.BatchUp -import com.snowplowanalytics.snowplow.loaders.transform.Transform +import com.snowplowanalytics.snowplow.loaders.transform.{BadRowsSerializer, Transform} object Processing { @@ -127,7 +127,7 @@ object Processing { .through(transform(badProcessor, env.schemasToSkip)) .through(BatchUp.withTimeout(env.batching.maxBytes, env.batching.maxDelay)) .through(writeToSnowflake(env, badProcessor)) - .through(sendFailedEvents(env)) + .through(sendFailedEvents(env, badProcessor)) .through(sendMetrics(env)) .through(emitTokens) } @@ -344,15 +344,19 @@ object Processing { env.tableManager.addColumns(extraColsRequired.toList) } - private def sendFailedEvents[F[_]: Applicative, A](env: Environment[F]): Pipe[F, BatchAfterTransform, BatchAfterTransform] = + private def sendFailedEvents[F[_]: Applicative]( + env: Environment[F], + badRowProcessor: BadRowProcessor + ): Pipe[F, BatchAfterTransform, BatchAfterTransform] = _.evalTap { batch => if (batch.badAccumulated.nonEmpty) { - val serialized = batch.badAccumulated.mapUnordered(_.compactByteArray) + val serialized = + batch.badAccumulated.mapUnordered(badRow => BadRowsSerializer.withMaxSize(badRow, badRowProcessor, env.badRowMaxSize)) env.badSink.sinkSimple(serialized) } else Applicative[F].unit } - private def sendMetrics[F[_]: Applicative, A](env: Environment[F]): Pipe[F, BatchAfterTransform, BatchAfterTransform] = + private def sendMetrics[F[_]: Applicative](env: Environment[F]): Pipe[F, BatchAfterTransform, BatchAfterTransform] = _.evalTap { batch => val countBad = batch.badAccumulated.asIterable.size env.metrics.addGood(batch.origBatchCount - countBad) *> env.metrics.addBad(countBad) diff --git a/modules/core/src/test/scala/com.snowplowanalytics.snowplow.snowflake/MockEnvironment.scala b/modules/core/src/test/scala/com.snowplowanalytics.snowplow.snowflake/MockEnvironment.scala index 10d1600..b7cd2ca 100644 --- a/modules/core/src/test/scala/com.snowplowanalytics.snowplow.snowflake/MockEnvironment.scala +++ b/modules/core/src/test/scala/com.snowplowanalytics.snowplow.snowflake/MockEnvironment.scala @@ -67,7 +67,8 @@ object MockEnvironment { maxDelay = 10.seconds, uploadConcurrency = 1 ), - schemasToSkip = List.empty + schemasToSkip = List.empty, + badRowMaxSize = 1000000 ) MockEnvironment(state, env) } diff --git a/project/Dependencies.scala b/project/Dependencies.scala index 02f1199..2d1de08 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -28,7 +28,7 @@ object Dependencies { val awsSdk2 = "2.20.135" // Snowplow - val streams = "0.2.0" + val streams = "0.3.0-M2" // tests val specs2 = "4.20.0"