Skip to content

Commit

Permalink
[WIP] Add incomplete events (close #)
Browse files Browse the repository at this point in the history
  • Loading branch information
benjben committed Feb 10, 2024
1 parent d7070eb commit efd5d31
Show file tree
Hide file tree
Showing 6 changed files with 237 additions and 163 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ object Enrich {
case None =>
Sync[F].unit
}
} yield (List(badRow.invalid), collectorTstamp)
} yield (List(Left((badRow, None))), collectorTstamp)

/** Build a `generic_error` bad row for unhandled runtime errors */
def genericBadRow(
Expand All @@ -189,11 +189,19 @@ object Enrich {
chunk: List[Result],
env: Environment[F, A]
): F[Unit] = {
val (bad, enriched) =
//val (bad, enriched, incomplete) =
val (bad, enriched, _) =
chunk
.flatMap(_._1)
.map(_.toEither)
.separate
.foldLeft((List.empty[BadRow], List.empty[EnrichedEvent], List.empty[EnrichedEvent])) {
case (previous, item) =>
val (bad, enriched, incomplete) = previous
item match {
case Right(e) => (bad, e :: enriched, incomplete)
case Left((br, Some(i))) => (br :: bad, enriched, i :: incomplete)
case Left((br, _)) => (br :: bad, enriched, incomplete)
}
}

val (moreBad, good) = enriched.map { e =>
serializeEnriched(e, env.processor, env.streamsSettings.maxRecordSize)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
*/
package com.snowplowanalytics.snowplow.enrich.common

import cats.data.{EitherT, Validated, ValidatedNel}
import cats.data.{EitherT, ValidatedNel}

import com.snowplowanalytics.snowplow.badrows.BadRow

Expand All @@ -25,8 +25,8 @@ package object fs2 {
type ByteSink[F[_]] = List[Array[Byte]] => F[Unit]
type AttributedByteSink[F[_]] = List[AttributedData[Array[Byte]]] => F[Unit]

/** Enrichment result, containing list of (valid and invalid) results as well as the collector timestamp */
type Result = (List[Validated[BadRow, EnrichedEvent]], Option[Long])
type Enriched = Either[(BadRow, Option[EnrichedEvent]), EnrichedEvent]
type Result = (List[Enriched], Option[Long])

/** Function to transform an origin raw payload into good and/or bad rows */
type Enrich[F[_]] = Array[Byte] => F[Result]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ package com.snowplowanalytics.snowplow.enrich.common

import cats.Monad
import cats.data.{Validated, ValidatedNel}
import cats.effect.Clock
import cats.effect.kernel.Sync
import cats.implicits._

import org.joda.time.DateTime
Expand Down Expand Up @@ -56,7 +56,7 @@ object EtlPipeline {
* @return the ValidatedMaybeCanonicalOutput. Thanks to flatMap, will include any validation
* errors contained within the ValidatedMaybeCanonicalInput
*/
def processEvents[F[_]: Clock: Monad](
def processEvents[F[_]: Sync](
adapterRegistry: AdapterRegistry[F],
enrichmentRegistry: EnrichmentRegistry[F],
client: IgluCirceClient[F],
Expand All @@ -67,7 +67,7 @@ object EtlPipeline {
invalidCount: F[Unit],
registryLookup: RegistryLookup[F],
atomicFields: AtomicFields
): F[List[Validated[BadRow, EnrichedEvent]]] =
): F[List[(Either[(BadRow, Option[EnrichedEvent]), EnrichedEvent])]] =
input match {
case Validated.Valid(Some(payload)) =>
adapterRegistry
Expand All @@ -87,14 +87,13 @@ object EtlPipeline {
registryLookup,
atomicFields
)
.toValidated
}
case Validated.Invalid(badRow) =>
Monad[F].pure(List(badRow.invalid[EnrichedEvent]))
Monad[F].pure(List((Left((badRow, None)))))
}
case Validated.Invalid(badRows) =>
Monad[F].pure(badRows.map(_.invalid[EnrichedEvent])).map(_.toList)
Monad[F].pure(badRows.toList.map(br => (Left((br, None)))))
case Validated.Valid(None) =>
Monad[F].pure(List.empty[Validated[BadRow, EnrichedEvent]])
Monad[F].pure(Nil)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ object AtomicFieldsLengthValidator {
acceptInvalid: Boolean,
invalidCount: F[Unit],
atomicFields: AtomicFields
): F[Either[BadRow, Unit]] =
): F[Either[BadRow.EnrichmentFailures, Unit]] =
atomicFields.value
.map(validateField(event))
.combineAll match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ import java.time.Instant
import org.joda.time.DateTime
import io.circe.Json
import cats.{Applicative, Monad}
import cats.data.{EitherT, NonEmptyList, OptionT, StateT}
import cats.effect.Clock
import cats.data.{EitherT, NonEmptyList, OptionT, StateT, Validated}
import cats.effect.kernel.Sync
import cats.implicits._

import com.snowplowanalytics.refererparser._
Expand Down Expand Up @@ -54,9 +54,8 @@ object EnrichmentManager {
* @param raw Canonical input event to enrich
* @param featureFlags The feature flags available in the current version of Enrich
* @param invalidCount Function to increment the count of invalid events
* @return Enriched event or bad row if a problem occured
*/
def enrichEvent[F[_]: Monad: Clock](
def enrichEvent[F[_]: Sync](
registry: EnrichmentRegistry[F],
client: IgluCirceClient[F],
processor: Processor,
Expand All @@ -65,38 +64,95 @@ object EnrichmentManager {
featureFlags: EtlPipeline.FeatureFlags,
invalidCount: F[Unit],
registryLookup: RegistryLookup[F],
atomicFields: AtomicFields
): EitherT[F, BadRow, EnrichedEvent] =
atomicFields: AtomicFields,
emitIncomplete: Boolean = true
): F[Either[(BadRow, Option[EnrichedEvent]), EnrichedEvent]] =
for {
enriched <- EitherT.fromEither[F](setupEnrichedEvent(raw, etlTstamp, processor))
extractResult <- IgluUtils.extractAndValidateInputJsons(enriched, client, raw, processor, registryLookup)
_ = {
ME.formatUnstructEvent(extractResult.unstructEvent).foreach(e => enriched.unstruct_event = e)
ME.formatContexts(extractResult.contexts).foreach(c => enriched.contexts = c)
}
enrichmentsContexts <- runEnrichments(
registry,
processor,
raw,
enriched,
extractResult.contexts,
extractResult.unstructEvent,
featureFlags.legacyEnrichmentOrder
)
_ = ME.formatContexts(enrichmentsContexts ::: extractResult.validationInfoContexts).foreach(c => enriched.derived_contexts = c)
_ <- IgluUtils
.validateEnrichmentsContexts[F](client, enrichmentsContexts, raw, processor, enriched, registryLookup)
_ <- EitherT.rightT[F, BadRow](
anonIp(enriched, registry.anonIp).foreach(enriched.user_ipaddress = _)
)
_ <- EitherT.rightT[F, BadRow] {
piiTransform(enriched, registry.piiPseudonymizer).foreach { pii =>
enriched.pii = pii.asString
}
}
_ <- validateEnriched(enriched, raw, processor, featureFlags.acceptInvalid, invalidCount, atomicFields)
} yield enriched

validatedInput <- mapAndValidateInput(raw, etlTstamp, processor, client, registryLookup)
(schemaViolations, enrichedEvent, extractResult) = validatedInput
enriched <- if (schemaViolations.isEmpty || emitIncomplete)
enrich(
enrichedEvent,
registry,
client,
processor,
raw,
extractResult.contexts,
extractResult.unstructEvent,
featureFlags.legacyEnrichmentOrder,
registryLookup
)
else
Sync[F].pure((None, Nil))
(enrichFailures, enrichmentsContexts) = enriched
validationFailures <- if ((schemaViolations.isEmpty && enrichFailures.isEmpty) || emitIncomplete)
validateEnriched(enrichedEvent, raw, processor, featureFlags.acceptInvalid, invalidCount, atomicFields)
else
Sync[F].pure(None)
badRows = List(schemaViolations, enrichFailures, validationFailures).flatten
output = badRows match {
case Nil =>
Right(enrichedEvent)
case head :: _ =>
if (!emitIncomplete)
Left((head, None))
else {
val failuresContext = createFailuresContext(badRows)
ME.formatContexts(failuresContext :: enrichmentsContexts ::: extractResult.validationInfoContexts)
.foreach(c => enrichedEvent.derived_contexts = c)
Left((head, Some(enrichedEvent)))
}
}
} yield output

// TODO: aggregate all the errors inside same SchemaViolations
def mapAndValidateInput[F[_]: Sync](
raw: RawEvent,
etlTstamp: DateTime,
processor: Processor,
client: IgluCirceClient[F],
registryLookup: RegistryLookup[F]
): F[(Option[BadRow], EnrichedEvent, IgluUtils.EventExtractResult)] =
for {
mapped <- Sync[F].delay(setupEnrichedEvent(raw, etlTstamp, processor))
(enrichmentFailures, enrichedEvent) = mapped
validated <- IgluUtils.extractAndValidateInputJsons(enrichedEvent, client, raw, processor, registryLookup)
(schemaViolations, sdjs) = validated
maybeBadRow = aggregateBadRows(List(enrichmentFailures, schemaViolations))
} yield (maybeBadRow, enrichedEvent, sdjs)

private def aggregateBadRows(badRows: List[Option[BadRow]]): Option[BadRow] =
badRows.flatten.headOption

def enrich[F[_]: Sync](
enrichedEvent: EnrichedEvent,
registry: EnrichmentRegistry[F],
client: IgluCirceClient[F],
processor: Processor,
raw: RawEvent,
inputContexts: List[SelfDescribingData[Json]],
unstructEvent: Option[SelfDescribingData[Json]],
legacyOrder: Boolean,
registryLookup: RegistryLookup[F]
): F[(Option[BadRow.EnrichmentFailures], List[SelfDescribingData[Json]])] =
for {
enriched <- runEnrichments(
enrichedEvent,
registry,
processor,
raw,
inputContexts,
unstructEvent,
legacyOrder
)
(enrichmentFailures, derivedContexts) = enriched
validated <- IgluUtils.validateEnrichmentsContexts[F](client, derivedContexts, raw, processor, enrichedEvent, registryLookup)
(moreFailures, validContexts) = validated
_ <- Sync[F].delay(anonIp(enrichedEvent, registry.anonIp).foreach(enrichedEvent.user_ipaddress = _))
_ <- Sync[F].delay(piiTransform(enrichedEvent, registry.piiPseudonymizer).foreach(pii => enrichedEvent.pii = pii.asString))
} yield (List(enrichmentFailures, moreFailures).flatten.headOption, validContexts)

// TODO return List[FailureDetails.EnrichmentFailure] rather than BadRow
/**
* Run all the enrichments and aggregate the errors if any
* @param enriched /!\ MUTABLE enriched event, mutated IN-PLACE /!\
Expand All @@ -105,32 +161,31 @@ object EnrichmentManager {
* with at least one enrichment
*/
private def runEnrichments[F[_]: Monad](
enriched: EnrichedEvent,
registry: EnrichmentRegistry[F],
processor: Processor,
raw: RawEvent,
enriched: EnrichedEvent,
inputContexts: List[SelfDescribingData[Json]],
unstructEvent: Option[SelfDescribingData[Json]],
legacyOrder: Boolean
): EitherT[F, BadRow.EnrichmentFailures, List[SelfDescribingData[Json]]] =
EitherT {
accState(registry, raw, inputContexts, unstructEvent, legacyOrder)
.runS(Accumulation(enriched, Nil, Nil))
.map {
case Accumulation(_, failures, contexts) =>
failures.toNel match {
case Some(nel) =>
buildEnrichmentFailuresBadRow(
nel,
EnrichedEvent.toPartiallyEnrichedEvent(enriched),
RawEvent.toRawEvent(raw),
processor
).asLeft
case None =>
contexts.asRight
}
}
}
): F[(Option[BadRow.EnrichmentFailures], List[SelfDescribingData[Json]])] =
accState(registry, raw, inputContexts, unstructEvent, legacyOrder)
.runS(Accumulation(enriched, Nil, Nil))
.map {
case Accumulation(_, failures, contexts) =>
failures.toNel match {
case Some(nel) =>
val badRow = buildEnrichmentFailuresBadRow(
nel,
EnrichedEvent.toPartiallyEnrichedEvent(enriched),
RawEvent.toRawEvent(raw),
processor
)
(Some(badRow), contexts)
case None =>
(None, contexts)
}
}

private[enrichments] case class Accumulation(
event: EnrichedEvent,
Expand Down Expand Up @@ -249,11 +304,12 @@ object EnrichmentManager {
}

/** Create the mutable [[EnrichedEvent]] and initialize it. */
// TODO create SchemaViolations instead of EnrichmentsFailures
private def setupEnrichedEvent(
raw: RawEvent,
etlTstamp: DateTime,
processor: Processor
): Either[BadRow.EnrichmentFailures, EnrichedEvent] = {
): (Option[BadRow.EnrichmentFailures], EnrichedEvent) = {
val e = new EnrichedEvent()
e.event_id = EE.generateEventId() // May be updated later if we have an `eid` parameter
e.v_collector = raw.source.name // May be updated later if we have a `cv` parameter
Expand All @@ -271,17 +327,18 @@ object EnrichmentManager {
// Map/validate/transform input fields to enriched event fields
val transformed = Transform.transform(raw, e)

(collectorTstamp |+| transformed)
.leftMap { enrichmentFailures =>
EnrichmentManager.buildEnrichmentFailuresBadRow(
(collectorTstamp |+| transformed) match {
case Validated.Invalid(enrichmentFailures) =>
val badRow = EnrichmentManager.buildEnrichmentFailuresBadRow(
enrichmentFailures,
EnrichedEvent.toPartiallyEnrichedEvent(e),
RawEvent.toRawEvent(raw),
processor
)
}
.as(e)
.toEither
(Some(badRow), e)
case _ =>
(None, e)
}
}

def setCollectorTstamp(event: EnrichedEvent, timestamp: Option[DateTime]): Either[FailureDetails.EnrichmentFailure, Unit] =
Expand Down Expand Up @@ -765,9 +822,17 @@ object EnrichmentManager {
acceptInvalid: Boolean,
invalidCount: F[Unit],
atomicFields: AtomicFields
): EitherT[F, BadRow, Unit] =
EitherT {
//We're using static field's length validation. See more in https://github.com/snowplow/enrich/issues/608
AtomicFieldsLengthValidator.validate[F](enriched, raw, processor, acceptInvalid, invalidCount, atomicFields)
}
): F[Option[BadRow.EnrichmentFailures]] =
// We're using static field's length validation. See more in https://github.com/snowplow/enrich/issues/608
AtomicFieldsLengthValidator
.validate[F](enriched, raw, processor, acceptInvalid, invalidCount, atomicFields)
.map {
case Left(br) => Some(br)
case _ => None
}

// TODO
private def createFailuresContext(
badRows: List[BadRow]
): SelfDescribingData[Json] = ???
}
Loading

0 comments on commit efd5d31

Please sign in to comment.