Skip to content

Commit

Permalink
Alternative Processing style
Browse files Browse the repository at this point in the history
  • Loading branch information
pondzix committed Jan 9, 2024
1 parent c8ae9aa commit 2f570c3
Show file tree
Hide file tree
Showing 4 changed files with 448 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package com.snowplowanalytics.snowplow.snowflake

import com.snowplowanalytics.snowplow.snowflake.model.BatchAfterTransform

trait DestinationWriter[F[_]] {

def writeBatch(batch: BatchAfterTransform): F[BatchAfterTransform]

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
package com.snowplowanalytics.snowplow.snowflake

import cats.effect.Sync
import cats.implicits._
import com.snowplowanalytics.snowplow.analytics.scalasdk.Event
import com.snowplowanalytics.snowplow.badrows.{BadRow, Processor, Payload => BadPayload}
import com.snowplowanalytics.snowplow.sinks.ListOfList
import com.snowplowanalytics.snowplow.snowflake.model.{BatchAfterTransform, ParsedWriteResult}
import com.snowplowanalytics.snowplow.snowflake.processing.{Channel, TableManager}
import net.snowflake.ingest.utils.{ErrorCode, SFException}
import org.typelevel.log4cats.Logger
import org.typelevel.log4cats.slf4j.Slf4jLogger

final class SnowflakeWriter[F[_]: Sync](
badProcessor: Processor,
channel: Channel.Provider[F],
tableManager: TableManager[F],
appHealth: AppHealth[F]
) extends DestinationWriter[F] {

private implicit def logger = Slf4jLogger.getLogger[F]

def writeBatch(batch: BatchAfterTransform): F[BatchAfterTransform] =
for {
batch <- writeAttempt1(batch)
batch <- writeAttempt2(batch)
} yield batch

/**
* First attempt to write events with the Snowflake SDK
*
* Enqueue failures are expected if the Event contains columns which are not present in the target
* table. If this happens, we alter the table ready for the second attempt
*/
private def writeAttempt1(
batch: BatchAfterTransform
): F[BatchAfterTransform] =
withWriteAttempt(batch) { notWritten =>
val parsedResult = ParsedWriteResult.buildFrom(batch.toBeInserted, notWritten)
for {
_ <- abortIfFatalException(parsedResult.unexpectedFailures)
_ <- handleSchemaEvolution(parsedResult.extraColsRequired)
} yield {
val moreBad = parsedResult.unexpectedFailures.map { case (event, sfe) =>
badRowFromEnqueueFailure(event, sfe)
}
batch.copy(
toBeInserted = ListOfList.ofLists(parsedResult.eventsWithExtraCols),
badAccumulated = batch.badAccumulated.prepend(moreBad)
)
}
}

/**
* Second attempt to write events with the Snowflake SDK
*
* This happens after we have attempted to alter the table for any new columns. So insert errors
* at this stage are unexpected.
*/
private def writeAttempt2(
batch: BatchAfterTransform
): F[BatchAfterTransform] =
withWriteAttempt(batch) { notWritten =>
val mapped = notWritten match {
case Nil => Nil
case more =>
val indexed = batch.toBeInserted.copyToIndexedSeq
more.map(f => (indexed(f.index.toInt)._1, f.cause))
}
abortIfFatalException(mapped).as {
val moreBad = mapped.map { case (event, sfe) =>
badRowFromEnqueueFailure(event, sfe)
}
batch.copy(
toBeInserted = ListOfList.empty,
badAccumulated = batch.badAccumulated.prepend(moreBad)
)
}
}

private def withWriteAttempt(
batch: BatchAfterTransform
)(
handleFailures: List[Channel.WriteFailure] => F[BatchAfterTransform]
): F[BatchAfterTransform] = {
val attempt: F[BatchAfterTransform] =
if (batch.toBeInserted.isEmpty)
batch.pure[F]
else
Sync[F].untilDefinedM {
channel.opened
.use { channel =>
channel.write(batch.toBeInserted.asIterable.map(_._2))
}
.flatMap {
case Channel.WriteResult.ChannelIsInvalid =>
// Reset the channel and immediately try again
channel.closed.use_.as(none)
case Channel.WriteResult.WriteFailures(notWritten) =>
handleFailures(notWritten).map(Some(_))
}
}

attempt
.onError { _ =>
appHealth.setServiceHealth(AppHealth.Service.Snowflake, isHealthy = false)
}
}

/**
* Raises an exception if needed
*
* The Snowflake SDK returns *all* exceptions as though they are equal. But we want to treat them
* separately:
* - Problems with data should be handled as Failed Events
* - Runtime problems (e.g. network issue or closed channel) should halt processing, so we don't
* send all events to the bad topic.
*/
private def abortIfFatalException(results: List[(Event, SFException)]): F[Unit] =
results.traverse_ { case (_, sfe) =>
if (dataIssueVendorCodes.contains(sfe.getVendorCode))
Sync[F].unit
else
Logger[F].error(sfe)("Insert yielded an error which this app cannot tolerate") *>
Sync[F].raiseError[Unit](sfe)
}

/**
* Alters the table to add any columns that were present in the Events but not currently in the
* table
*/
private def handleSchemaEvolution(
extraColsRequired: Set[String]
): F[Unit] =
if (extraColsRequired.isEmpty)
().pure[F]
else
channel.closed.surround {
tableManager.addColumns(extraColsRequired.toList)
}

private def badRowFromEnqueueFailure(
event: Event,
cause: SFException
): BadRow =
BadRow.LoaderRuntimeError(badProcessor, cause.getMessage, BadPayload.LoaderPayload(event))

/**
* The sub-set of vendor codes that indicate a problem with *data* rather than problems with the
* environment
*/
private val dataIssueVendorCodes: Set[String] =
List(
ErrorCode.INVALID_FORMAT_ROW,
ErrorCode.INVALID_VALUE_ROW,
ErrorCode.MAX_ROW_SIZE_EXCEEDED,
ErrorCode.UNKNOWN_DATA_TYPE,
ErrorCode.NULL_VALUE,
ErrorCode.NULL_OR_EMPTY_STRING
).map(_.getMessageCode).toSet

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
package com.snowplowanalytics.snowplow.snowflake

import cats.effect.kernel.Unique
import com.snowplowanalytics.snowplow.analytics.scalasdk.Event
import com.snowplowanalytics.snowplow.badrows.BadRow
import com.snowplowanalytics.snowplow.runtime.processing.BatchUp
import com.snowplowanalytics.snowplow.sinks.ListOfList
import com.snowplowanalytics.snowplow.snowflake.processing.Channel
import net.snowflake.ingest.utils.SFException

object model {

/** Model used between stages of the processing pipeline */

case class ParsedBatch(
events: List[Event],
parseFailures: List[BadRow],
countBytes: Long,
countItems: Int,
token: Unique.Token
)

case class TransformedBatch(
events: List[EventWithTransform],
parseFailures: List[BadRow],
transformFailures: List[BadRow],
countBytes: Long,
countItems: Int,
token: Unique.Token
)

type EventWithTransform = (Event, Map[String, AnyRef])

/**
* State of a batch for all stages post-transform
*
* @param toBeInserted
* Events from this batch which have not yet been inserted. Events are dropped from this list
* once they have either failed or got inserted.
* @param origBatchBytes
* The total size in bytes of events in the original batch. Includes all good and bad events.
* @param origBatchCount
* The count of events in the original batch. Includes all good and bad events.
* @param badAccumulated
* Events that failed for any reason so far.
* @param tokens
* The tokens to be emitted after we have finished processing all events
*/
case class BatchAfterTransform(
toBeInserted: ListOfList[EventWithTransform],
origBatchBytes: Long,
origBatchCount: Int,
badAccumulated: ListOfList[BadRow],
tokens: Vector[Unique.Token]
)

/**
* Result of attempting to enqueue a batch of events to be sent to Snowflake
*
* @param extraCols
* The column names which were present in the batch but missing in the table
* @param eventsWithExtraCols
* Events which failed to be inserted because they contained extra columns are missing in the
* table. These issues should be resolved once we alter the table.
* @param unexpectedFailures
* Events which failed to be inserted for any other reason
*/
case class ParsedWriteResult(
extraColsRequired: Set[String],
eventsWithExtraCols: List[EventWithTransform],
unexpectedFailures: List[(Event, SFException)]
)

object ParsedWriteResult {
def empty: ParsedWriteResult = ParsedWriteResult(Set.empty, Nil, Nil)

def buildFrom(events: ListOfList[EventWithTransform], writeFailures: List[Channel.WriteFailure]): ParsedWriteResult =
if (writeFailures.isEmpty)
empty
else {
val indexed = events.copyToIndexedSeq
writeFailures.foldLeft(ParsedWriteResult.empty) { case (ParsedWriteResult(extraCols, eventsWithExtraCols, unexpected), failure) =>
val event = indexed(failure.index.toInt)
if (failure.extraCols.nonEmpty)
ParsedWriteResult(extraCols ++ failure.extraCols, event :: eventsWithExtraCols, unexpected)
else
ParsedWriteResult(extraCols, eventsWithExtraCols, (event._1, failure.cause) :: unexpected)
}
}
}

implicit def batchable: BatchUp.Batchable[TransformedBatch, BatchAfterTransform] =
new BatchUp.Batchable[TransformedBatch, BatchAfterTransform] {
def combine(b: BatchAfterTransform, a: TransformedBatch): BatchAfterTransform =
BatchAfterTransform(
toBeInserted = b.toBeInserted.prepend(a.events),
origBatchBytes = b.origBatchBytes + a.countBytes,
origBatchCount = b.origBatchCount + a.countItems,
badAccumulated = b.badAccumulated.prepend(a.parseFailures).prepend(a.transformFailures),
tokens = b.tokens :+ a.token
)

def single(a: TransformedBatch): BatchAfterTransform =
BatchAfterTransform(
ListOfList.of(List(a.events)),
a.countBytes,
a.countItems,
ListOfList.ofLists(a.parseFailures, a.transformFailures),
Vector(a.token)
)

def weightOf(a: TransformedBatch): Long =
a.countBytes
}
}
Loading

0 comments on commit 2f570c3

Please sign in to comment.