diff --git a/config/config.azure.reference.hocon b/config/config.azure.reference.hocon index 6364817..d4b15cf 100644 --- a/config/config.azure.reference.hocon +++ b/config/config.azure.reference.hocon @@ -79,6 +79,14 @@ "uploadConcurrency": 1 } + # -- Schemas that won't be loaded to Snowflake. Optional, default value [] + "skipSchemas": [ + "iglu:com.acme/skipped1/jsonschema/1-0-0", + "iglu:com.acme/skipped2/jsonschema/1-0-*", + "iglu:com.acme/skipped3/jsonschema/1-*-*", + "iglu:com.acme/skipped4/jsonschema/*-*-*" + ] + "monitoring": { "metrics": { diff --git a/config/config.kinesis.reference.hocon b/config/config.kinesis.reference.hocon index c97dbd1..a6c7291 100644 --- a/config/config.kinesis.reference.hocon +++ b/config/config.kinesis.reference.hocon @@ -100,6 +100,14 @@ "uploadConcurrency": 1 } + # -- Schemas that won't be loaded to Snowflake. Optional, default value [] + "skipSchemas": [ + "iglu:com.acme/skipped1/jsonschema/1-0-0", + "iglu:com.acme/skipped2/jsonschema/1-0-*", + "iglu:com.acme/skipped3/jsonschema/1-*-*", + "iglu:com.acme/skipped4/jsonschema/*-*-*" + ] + "monitoring": { "metrics": { diff --git a/config/config.pubsub.reference.hocon b/config/config.pubsub.reference.hocon index 4330e72..3f879f7 100644 --- a/config/config.pubsub.reference.hocon +++ b/config/config.pubsub.reference.hocon @@ -93,6 +93,14 @@ "uploadConcurrency": 1 } + # -- Schemas that won't be loaded to Snowflake. Optional, default value [] + "skipSchemas": [ + "iglu:com.acme/skipped1/jsonschema/1-0-0", + "iglu:com.acme/skipped2/jsonschema/1-0-*", + "iglu:com.acme/skipped3/jsonschema/1-*-*", + "iglu:com.acme/skipped4/jsonschema/*-*-*" + ] + "monitoring": { "metrics": { diff --git a/modules/core/src/main/resources/reference.conf b/modules/core/src/main/resources/reference.conf index bbe2db8..2a8f11b 100644 --- a/modules/core/src/main/resources/reference.conf +++ b/modules/core/src/main/resources/reference.conf @@ -15,6 +15,8 @@ "maxDelay": "1 second" "uploadConcurrency": 3 } + + "skipSchemas": [] "monitoring": { "metrics": { diff --git a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/Config.scala b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/Config.scala index 339da2f..33fa1df 100644 --- a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/Config.scala +++ b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/Config.scala @@ -14,10 +14,11 @@ import io.circe.generic.extras.Configuration import io.circe.config.syntax._ import net.snowflake.ingest.utils.SnowflakeURL import com.comcast.ip4s.Port +import com.snowplowanalytics.iglu.core.SchemaCriterion +import com.snowplowanalytics.iglu.core.circe.CirceIgluCodecs.schemaCriterionDecoder import scala.concurrent.duration.FiniteDuration import scala.util.Try - import com.snowplowanalytics.snowplow.runtime.{Metrics => CommonMetrics, Telemetry} import com.snowplowanalytics.snowplow.runtime.HealthProbe.decoders._ @@ -25,6 +26,7 @@ case class Config[+Source, +Sink]( input: Source, output: Config.Output[Sink], batching: Config.Batching, + skipSchemas: List[SchemaCriterion], telemetry: Telemetry.Config, monitoring: Config.Monitoring ) diff --git a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/Environment.scala b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/Environment.scala index a4e7299..6993bcc 100644 --- a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/Environment.scala +++ b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/Environment.scala @@ -11,10 +11,10 @@ import cats.implicits._ import cats.Functor import cats.effect.{Async, Resource, Sync} import cats.effect.unsafe.implicits.global +import com.snowplowanalytics.iglu.core.SchemaCriterion import org.http4s.client.Client import org.http4s.blaze.client.BlazeClientBuilder import io.sentry.Sentry - import com.snowplowanalytics.snowplow.sources.SourceAndAck import com.snowplowanalytics.snowplow.sinks.Sink import com.snowplowanalytics.snowplow.snowflake.processing.{ChannelProvider, TableManager} @@ -28,7 +28,8 @@ case class Environment[F[_]]( tblManager: TableManager[F], channelProvider: ChannelProvider[F], metrics: Metrics[F], - batching: Config.Batching + batching: Config.Batching, + schemasToSkip: List[SchemaCriterion] ) object Environment { @@ -58,7 +59,8 @@ object Environment { tblManager = tblManager, channelProvider = channelProvider, metrics = metrics, - batching = config.batching + batching = config.batching, + schemasToSkip = config.skipSchemas ) private def enableSentry[F[_]: Sync](appInfo: AppInfo, config: Option[Config.Sentry]): Resource[F, Unit] = diff --git a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/processing/Processing.scala b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/processing/Processing.scala index c1caebe..4c1a087 100644 --- a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/processing/Processing.scala +++ b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/processing/Processing.scala @@ -11,6 +11,7 @@ import cats.implicits._ import cats.{Applicative, Foldable, Monad} import cats.effect.{Async, Sync} import cats.effect.kernel.Unique +import com.snowplowanalytics.iglu.core.SchemaCriterion import fs2.{Chunk, Pipe, Stream} import net.snowflake.ingest.utils.{ErrorCode, SFException} import org.typelevel.log4cats.Logger @@ -108,7 +109,7 @@ object Processing { if (writeFailures.isEmpty) empty else { - val indexed = events.toIndexedSeq + val indexed = events.copyToIndexedSeq writeFailures.foldLeft(ParsedWriteResult.empty) { case (ParsedWriteResult(extraCols, eventsWithExtraCols, unexpected), failure) => val event = fastGetByIndex(indexed, failure.index) if (failure.extraCols.nonEmpty) @@ -126,7 +127,7 @@ object Processing { in.through(setLatency(env.metrics)) .through(parseBytes(badProcessor)) - .through(transform(badProcessor)) + .through(transform(badProcessor, env.schemasToSkip)) .through(BatchUp.withTimeout(env.batching.maxBytes, env.batching.maxDelay)) .through(writeToSnowflake(env, badProcessor)) .through(sendFailedEvents(env)) @@ -166,24 +167,28 @@ object Processing { } /** Transform the Event into values compatible with the snowflake ingest sdk */ - private def transform[F[_]: Sync](badProcessor: BadRowProcessor): Pipe[F, ParsedBatch, TransformedBatch] = + private def transform[F[_]: Sync]( + badProcessor: BadRowProcessor, + schemasToSkip: List[SchemaCriterion] + ): Pipe[F, ParsedBatch, TransformedBatch] = _.evalMap { batch => Sync[F].realTimeInstant.flatMap { now => val loadTstamp = SnowflakeCaster.timestampValue(now) - transformBatch[F](badProcessor, loadTstamp, batch) + transformBatch[F](badProcessor, loadTstamp, batch, schemasToSkip) } } private def transformBatch[F[_]: Sync]( badProcessor: BadRowProcessor, loadTstamp: OffsetDateTime, - batch: ParsedBatch + batch: ParsedBatch, + schemasToSkip: List[SchemaCriterion] ): F[TransformedBatch] = Foldable[List] .traverseSeparateUnordered(batch.events) { event => Sync[F].delay { Transform - .transformEventUnstructured[AnyRef](badProcessor, SnowflakeCaster, SnowflakeJsonFolder, event) + .transformEventUnstructured[AnyRef](badProcessor, SnowflakeCaster, SnowflakeJsonFolder, event, schemasToSkip) .map { namedValues => val map = namedValues .map { case Caster.NamedValue(k, v) => @@ -271,7 +276,7 @@ object Processing { val mapped = notWritten match { case Nil => Nil case more => - val indexed = batch.toBeInserted.toIndexedSeq + val indexed = batch.toBeInserted.copyToIndexedSeq more.map(f => (fastGetByIndex(indexed, f.index)._1, f.cause)) } abortIfFatalException[F](mapped).as { diff --git a/modules/core/src/test/scala/com.snowplowanalytics.snowplow.snowflake/MockEnvironment.scala b/modules/core/src/test/scala/com.snowplowanalytics.snowplow.snowflake/MockEnvironment.scala index 4cbe7c2..989edbf 100644 --- a/modules/core/src/test/scala/com.snowplowanalytics.snowplow.snowflake/MockEnvironment.scala +++ b/modules/core/src/test/scala/com.snowplowanalytics.snowplow.snowflake/MockEnvironment.scala @@ -64,7 +64,8 @@ object MockEnvironment { maxBytes = 16000000, maxDelay = 10.seconds, uploadConcurrency = 1 - ) + ), + schemasToSkip = List.empty ) MockEnvironment(state, env) } diff --git a/project/Dependencies.scala b/project/Dependencies.scala index e0bf7e4..c8f86a0 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -28,7 +28,7 @@ object Dependencies { val awsSdk2 = "2.20.135" // Snowplow - val streams = "0.2.0-M1a" + val streams = "0.2.0-M2" // tests val specs2 = "4.20.0"