From 2f570c3d81c60d3f99db775e173441fd9e78743e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20Poniedzia=C5=82ek?= Date: Tue, 9 Jan 2024 11:15:03 +0100 Subject: [PATCH] Alternative `Processing` style --- .../DestinationWriter.scala | 9 + .../SnowflakeWriter.scala | 162 ++++++++++++++++++ .../model.scala | 115 +++++++++++++ .../processing/Processing2.scala | 162 ++++++++++++++++++ 4 files changed, 448 insertions(+) create mode 100644 modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/DestinationWriter.scala create mode 100644 modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/SnowflakeWriter.scala create mode 100644 modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/model.scala create mode 100644 modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/processing/Processing2.scala diff --git a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/DestinationWriter.scala b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/DestinationWriter.scala new file mode 100644 index 0000000..c25cd39 --- /dev/null +++ b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/DestinationWriter.scala @@ -0,0 +1,9 @@ +package com.snowplowanalytics.snowplow.snowflake + +import com.snowplowanalytics.snowplow.snowflake.model.BatchAfterTransform + +trait DestinationWriter[F[_]] { + + def writeBatch(batch: BatchAfterTransform): F[BatchAfterTransform] + +} diff --git a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/SnowflakeWriter.scala b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/SnowflakeWriter.scala new file mode 100644 index 0000000..ce9172b --- /dev/null +++ b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/SnowflakeWriter.scala @@ -0,0 +1,162 @@ +package com.snowplowanalytics.snowplow.snowflake + +import cats.effect.Sync +import cats.implicits._ +import com.snowplowanalytics.snowplow.analytics.scalasdk.Event +import com.snowplowanalytics.snowplow.badrows.{BadRow, Processor, Payload => BadPayload} +import com.snowplowanalytics.snowplow.sinks.ListOfList +import com.snowplowanalytics.snowplow.snowflake.model.{BatchAfterTransform, ParsedWriteResult} +import com.snowplowanalytics.snowplow.snowflake.processing.{Channel, TableManager} +import net.snowflake.ingest.utils.{ErrorCode, SFException} +import org.typelevel.log4cats.Logger +import org.typelevel.log4cats.slf4j.Slf4jLogger + +final class SnowflakeWriter[F[_]: Sync]( + badProcessor: Processor, + channel: Channel.Provider[F], + tableManager: TableManager[F], + appHealth: AppHealth[F] +) extends DestinationWriter[F] { + + private implicit def logger = Slf4jLogger.getLogger[F] + + def writeBatch(batch: BatchAfterTransform): F[BatchAfterTransform] = + for { + batch <- writeAttempt1(batch) + batch <- writeAttempt2(batch) + } yield batch + + /** + * First attempt to write events with the Snowflake SDK + * + * Enqueue failures are expected if the Event contains columns which are not present in the target + * table. If this happens, we alter the table ready for the second attempt + */ + private def writeAttempt1( + batch: BatchAfterTransform + ): F[BatchAfterTransform] = + withWriteAttempt(batch) { notWritten => + val parsedResult = ParsedWriteResult.buildFrom(batch.toBeInserted, notWritten) + for { + _ <- abortIfFatalException(parsedResult.unexpectedFailures) + _ <- handleSchemaEvolution(parsedResult.extraColsRequired) + } yield { + val moreBad = parsedResult.unexpectedFailures.map { case (event, sfe) => + badRowFromEnqueueFailure(event, sfe) + } + batch.copy( + toBeInserted = ListOfList.ofLists(parsedResult.eventsWithExtraCols), + badAccumulated = batch.badAccumulated.prepend(moreBad) + ) + } + } + + /** + * Second attempt to write events with the Snowflake SDK + * + * This happens after we have attempted to alter the table for any new columns. So insert errors + * at this stage are unexpected. + */ + private def writeAttempt2( + batch: BatchAfterTransform + ): F[BatchAfterTransform] = + withWriteAttempt(batch) { notWritten => + val mapped = notWritten match { + case Nil => Nil + case more => + val indexed = batch.toBeInserted.copyToIndexedSeq + more.map(f => (indexed(f.index.toInt)._1, f.cause)) + } + abortIfFatalException(mapped).as { + val moreBad = mapped.map { case (event, sfe) => + badRowFromEnqueueFailure(event, sfe) + } + batch.copy( + toBeInserted = ListOfList.empty, + badAccumulated = batch.badAccumulated.prepend(moreBad) + ) + } + } + + private def withWriteAttempt( + batch: BatchAfterTransform + )( + handleFailures: List[Channel.WriteFailure] => F[BatchAfterTransform] + ): F[BatchAfterTransform] = { + val attempt: F[BatchAfterTransform] = + if (batch.toBeInserted.isEmpty) + batch.pure[F] + else + Sync[F].untilDefinedM { + channel.opened + .use { channel => + channel.write(batch.toBeInserted.asIterable.map(_._2)) + } + .flatMap { + case Channel.WriteResult.ChannelIsInvalid => + // Reset the channel and immediately try again + channel.closed.use_.as(none) + case Channel.WriteResult.WriteFailures(notWritten) => + handleFailures(notWritten).map(Some(_)) + } + } + + attempt + .onError { _ => + appHealth.setServiceHealth(AppHealth.Service.Snowflake, isHealthy = false) + } + } + + /** + * Raises an exception if needed + * + * The Snowflake SDK returns *all* exceptions as though they are equal. But we want to treat them + * separately: + * - Problems with data should be handled as Failed Events + * - Runtime problems (e.g. network issue or closed channel) should halt processing, so we don't + * send all events to the bad topic. + */ + private def abortIfFatalException(results: List[(Event, SFException)]): F[Unit] = + results.traverse_ { case (_, sfe) => + if (dataIssueVendorCodes.contains(sfe.getVendorCode)) + Sync[F].unit + else + Logger[F].error(sfe)("Insert yielded an error which this app cannot tolerate") *> + Sync[F].raiseError[Unit](sfe) + } + + /** + * Alters the table to add any columns that were present in the Events but not currently in the + * table + */ + private def handleSchemaEvolution( + extraColsRequired: Set[String] + ): F[Unit] = + if (extraColsRequired.isEmpty) + ().pure[F] + else + channel.closed.surround { + tableManager.addColumns(extraColsRequired.toList) + } + + private def badRowFromEnqueueFailure( + event: Event, + cause: SFException + ): BadRow = + BadRow.LoaderRuntimeError(badProcessor, cause.getMessage, BadPayload.LoaderPayload(event)) + + /** + * The sub-set of vendor codes that indicate a problem with *data* rather than problems with the + * environment + */ + private val dataIssueVendorCodes: Set[String] = + List( + ErrorCode.INVALID_FORMAT_ROW, + ErrorCode.INVALID_VALUE_ROW, + ErrorCode.MAX_ROW_SIZE_EXCEEDED, + ErrorCode.UNKNOWN_DATA_TYPE, + ErrorCode.NULL_VALUE, + ErrorCode.NULL_OR_EMPTY_STRING + ).map(_.getMessageCode).toSet + +} diff --git a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/model.scala b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/model.scala new file mode 100644 index 0000000..2cdcd5e --- /dev/null +++ b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/model.scala @@ -0,0 +1,115 @@ +package com.snowplowanalytics.snowplow.snowflake + +import cats.effect.kernel.Unique +import com.snowplowanalytics.snowplow.analytics.scalasdk.Event +import com.snowplowanalytics.snowplow.badrows.BadRow +import com.snowplowanalytics.snowplow.runtime.processing.BatchUp +import com.snowplowanalytics.snowplow.sinks.ListOfList +import com.snowplowanalytics.snowplow.snowflake.processing.Channel +import net.snowflake.ingest.utils.SFException + +object model { + + /** Model used between stages of the processing pipeline */ + + case class ParsedBatch( + events: List[Event], + parseFailures: List[BadRow], + countBytes: Long, + countItems: Int, + token: Unique.Token + ) + + case class TransformedBatch( + events: List[EventWithTransform], + parseFailures: List[BadRow], + transformFailures: List[BadRow], + countBytes: Long, + countItems: Int, + token: Unique.Token + ) + + type EventWithTransform = (Event, Map[String, AnyRef]) + + /** + * State of a batch for all stages post-transform + * + * @param toBeInserted + * Events from this batch which have not yet been inserted. Events are dropped from this list + * once they have either failed or got inserted. + * @param origBatchBytes + * The total size in bytes of events in the original batch. Includes all good and bad events. + * @param origBatchCount + * The count of events in the original batch. Includes all good and bad events. + * @param badAccumulated + * Events that failed for any reason so far. + * @param tokens + * The tokens to be emitted after we have finished processing all events + */ + case class BatchAfterTransform( + toBeInserted: ListOfList[EventWithTransform], + origBatchBytes: Long, + origBatchCount: Int, + badAccumulated: ListOfList[BadRow], + tokens: Vector[Unique.Token] + ) + + /** + * Result of attempting to enqueue a batch of events to be sent to Snowflake + * + * @param extraCols + * The column names which were present in the batch but missing in the table + * @param eventsWithExtraCols + * Events which failed to be inserted because they contained extra columns are missing in the + * table. These issues should be resolved once we alter the table. + * @param unexpectedFailures + * Events which failed to be inserted for any other reason + */ + case class ParsedWriteResult( + extraColsRequired: Set[String], + eventsWithExtraCols: List[EventWithTransform], + unexpectedFailures: List[(Event, SFException)] + ) + + object ParsedWriteResult { + def empty: ParsedWriteResult = ParsedWriteResult(Set.empty, Nil, Nil) + + def buildFrom(events: ListOfList[EventWithTransform], writeFailures: List[Channel.WriteFailure]): ParsedWriteResult = + if (writeFailures.isEmpty) + empty + else { + val indexed = events.copyToIndexedSeq + writeFailures.foldLeft(ParsedWriteResult.empty) { case (ParsedWriteResult(extraCols, eventsWithExtraCols, unexpected), failure) => + val event = indexed(failure.index.toInt) + if (failure.extraCols.nonEmpty) + ParsedWriteResult(extraCols ++ failure.extraCols, event :: eventsWithExtraCols, unexpected) + else + ParsedWriteResult(extraCols, eventsWithExtraCols, (event._1, failure.cause) :: unexpected) + } + } + } + + implicit def batchable: BatchUp.Batchable[TransformedBatch, BatchAfterTransform] = + new BatchUp.Batchable[TransformedBatch, BatchAfterTransform] { + def combine(b: BatchAfterTransform, a: TransformedBatch): BatchAfterTransform = + BatchAfterTransform( + toBeInserted = b.toBeInserted.prepend(a.events), + origBatchBytes = b.origBatchBytes + a.countBytes, + origBatchCount = b.origBatchCount + a.countItems, + badAccumulated = b.badAccumulated.prepend(a.parseFailures).prepend(a.transformFailures), + tokens = b.tokens :+ a.token + ) + + def single(a: TransformedBatch): BatchAfterTransform = + BatchAfterTransform( + ListOfList.of(List(a.events)), + a.countBytes, + a.countItems, + ListOfList.ofLists(a.parseFailures, a.transformFailures), + Vector(a.token) + ) + + def weightOf(a: TransformedBatch): Long = + a.countBytes + } +} diff --git a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/processing/Processing2.scala b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/processing/Processing2.scala new file mode 100644 index 0000000..2846f75 --- /dev/null +++ b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/processing/Processing2.scala @@ -0,0 +1,162 @@ +/* + * Copyright (c) 2023-present Snowplow Analytics Ltd. All rights reserved. + * + * This program is licensed to you under the Snowplow Community License Version 1.0, + * and you may not use this file except in compliance with the Snowplow Community License Version 1.0. + * You may obtain a copy of the Snowplow Community License Version 1.0 at https://docs.snowplow.io/community-license-1.0 + */ +package com.snowplowanalytics.snowplow.snowflake.processing + +import cats.effect.kernel.Unique +import cats.effect.{Async, Sync} +import cats.implicits._ +import cats.{Applicative, Foldable} +import com.snowplowanalytics.iglu.core.SchemaCriterion +import com.snowplowanalytics.iglu.schemaddl.parquet.Caster +import com.snowplowanalytics.snowplow.analytics.scalasdk.Event +import com.snowplowanalytics.snowplow.badrows.Payload.{RawPayload => BadRowRawPayload} +import com.snowplowanalytics.snowplow.badrows.{BadRow, Processor => BadRowProcessor} +import com.snowplowanalytics.snowplow.loaders.transform.{BadRowsSerializer, Transform} +import com.snowplowanalytics.snowplow.runtime.processing.BatchUp +import com.snowplowanalytics.snowplow.runtime.syntax.foldable._ +import com.snowplowanalytics.snowplow.sinks.Sink +import com.snowplowanalytics.snowplow.snowflake.model._ +import com.snowplowanalytics.snowplow.snowflake.{AppHealth, DestinationWriter, Metrics} +import com.snowplowanalytics.snowplow.sources.{EventProcessingConfig, EventProcessor, SourceAndAck, TokenedEvents} +import fs2.{Chunk, Pipe, Stream} + +import java.nio.charset.StandardCharsets +import java.time.OffsetDateTime +import scala.concurrent.duration.FiniteDuration + +final class Processing2[F[_]: Async]( + source: SourceAndAck[F], + tableManager: TableManager[F], + destinationWriter: DestinationWriter[F], + badSink: Sink[F], + appHealth: AppHealth[F], + badRowProcessor: BadRowProcessor, + metrics: Metrics[F], + config: Processing2.Config +) { + + def stream: Stream[F, Nothing] = { + val eventProcessingConfig = EventProcessingConfig(EventProcessingConfig.NoWindowing) + Stream.eval(tableManager.initializeEventsTable()) *> + source.stream(eventProcessingConfig, eventProcessor()) + } + + private def eventProcessor(): EventProcessor[F] = { in => + in.through(setLatency()) + .through(parseBytes()) + .through(transform()) + .through(BatchUp.withTimeout(config.batchMaxWeight, config.batchMaxDelay)) + .through(writeToSnowflake()) + .through(sendFailedEvents()) + .through(sendMetrics()) + .through(emitTokens()) + } + + private def setLatency(): Pipe[F, TokenedEvents, TokenedEvents] = + _.evalTap { + _.earliestSourceTstamp match { + case Some(t) => + for { + now <- Sync[F].realTime + latencyMillis = now.toMillis - t.toEpochMilli + _ <- metrics.setLatencyMillis(latencyMillis) + } yield () + case None => + Applicative[F].unit + } + } + + /** Parse raw bytes into Event using analytics sdk */ + private def parseBytes(): Pipe[F, TokenedEvents, ParsedBatch] = + _.evalMap { case TokenedEvents(chunk, token, _) => + for { + numBytes <- Sync[F].delay(Foldable[Chunk].sumBytes(chunk)) + (badRows, events) <- Foldable[Chunk].traverseSeparateUnordered(chunk) { bytes => + Sync[F].delay { + Event.parseBytes(bytes).toEither.leftMap { failure => + val payload = BadRowRawPayload(StandardCharsets.UTF_8.decode(bytes).toString) + BadRow.LoaderParsingError(badRowProcessor, failure, payload) + } + } + } + } yield ParsedBatch(events, badRows, numBytes, chunk.size, token) + } + + /** Transform the Event into values compatible with the snowflake ingest sdk */ + private def transform(): Pipe[F, ParsedBatch, TransformedBatch] = + _.evalMap { batch => + Sync[F].realTimeInstant.flatMap { now => + val loadTstamp = SnowflakeCaster.timestampValue(now) + transformBatch(loadTstamp, batch) + } + } + + private def transformBatch( + loadTstamp: OffsetDateTime, + batch: ParsedBatch + ): F[TransformedBatch] = + Foldable[List] + .traverseSeparateUnordered(batch.events) { event => + Sync[F].delay { + Transform + .transformEventUnstructured[AnyRef](badRowProcessor, SnowflakeCaster, SnowflakeJsonFolder, event, config.schemasToSkip) + .map { namedValues => + val map = namedValues + .map { case Caster.NamedValue(k, v) => + k -> v + } + .toMap + .updated("load_tstamp", loadTstamp) + event -> map + } + } + } + .map { case (transformFailures, eventsWithTransforms) => + TransformedBatch(eventsWithTransforms, batch.parseFailures, transformFailures, batch.countBytes, batch.countItems, batch.token) + } + + private def writeToSnowflake(): Pipe[F, BatchAfterTransform, BatchAfterTransform] = + _.parEvalMap(config.uploadConcurrency) { batch => + destinationWriter.writeBatch(batch) + } + + private def sendFailedEvents(): Pipe[F, BatchAfterTransform, BatchAfterTransform] = + _.evalTap { batch => + if (batch.badAccumulated.nonEmpty) { + val serialized = + batch.badAccumulated.mapUnordered(badRow => BadRowsSerializer.withMaxSize(badRow, badRowProcessor, config.badRowMaxSize)) + badSink + .sinkSimple(serialized) + .onError { _ => + appHealth.setServiceHealth(AppHealth.Service.BadSink, isHealthy = false) + } + } else Applicative[F].unit + } + + private def sendMetrics(): Pipe[F, BatchAfterTransform, BatchAfterTransform] = + _.evalTap { batch => + val countBad = batch.badAccumulated.asIterable.size + metrics.addGood(batch.origBatchCount - countBad) *> metrics.addBad(countBad) + } + + private def emitTokens(): Pipe[F, BatchAfterTransform, Unique.Token] = + _.flatMap { batch => + Stream.emits(batch.tokens) + } +} + +object Processing2 { + + final case class Config( + schemasToSkip: List[SchemaCriterion], + badRowMaxSize: Int, + uploadConcurrency: Int, + batchMaxWeight: Long, + batchMaxDelay: FiniteDuration + ) +}