diff --git a/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/enrichments/EnrichmentManager.scala b/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/enrichments/EnrichmentManager.scala index 10a82e25c..8643fc83a 100644 --- a/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/enrichments/EnrichmentManager.scala +++ b/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/enrichments/EnrichmentManager.scala @@ -123,6 +123,10 @@ object EnrichmentManager { val iorT = for { _ <- IorT.fromIor[F](setupEnrichedEvent(raw, enrichedEvent, etlTstamp, processor)) extract <- IgluUtils.extractAndValidateInputJsons(enrichedEvent, client, registryLookup) + _ = { + enrichedEvent.contexts = ME.formatContexts(extract.contexts).getOrElse("") + enrichedEvent.unstruct_event = ME.formatUnstructEvent(extract.unstructEvent).getOrElse("") + } } yield extract iorT.leftMap { violations => diff --git a/modules/kinesis/src/it/resources/enrich/enrich-localstack.hocon b/modules/kinesis/src/it/resources/enrich/enrich-localstack.hocon index 569eca940..bf441a8f0 100644 --- a/modules/kinesis/src/it/resources/enrich/enrich-localstack.hocon +++ b/modules/kinesis/src/it/resources/enrich/enrich-localstack.hocon @@ -27,6 +27,13 @@ "region": ${REGION} "customEndpoint": ${LOCALSTACK_ENDPOINT} } + + "incomplete": { + "type": "Kinesis" + "streamName": ${STREAM_INCOMPLETE} + "region": ${REGION} + "customEndpoint": ${LOCALSTACK_ENDPOINT} + } } "monitoring": { diff --git a/modules/kinesis/src/it/scala/com/snowplowanalytics/snowplow/enrich/kinesis/Containers.scala b/modules/kinesis/src/it/scala/com/snowplowanalytics/snowplow/enrich/kinesis/Containers.scala index 55b9ec6d7..b4cc0ef82 100644 --- a/modules/kinesis/src/it/scala/com/snowplowanalytics/snowplow/enrich/kinesis/Containers.scala +++ b/modules/kinesis/src/it/scala/com/snowplowanalytics/snowplow/enrich/kinesis/Containers.scala @@ -86,6 +86,7 @@ object Containers extends CatsEffect { "STREAM_RAW" -> streams.raw, "STREAM_ENRICHED" -> streams.enriched, "STREAM_BAD" -> streams.bad, + "STREAM_INCOMPLETE" -> streams.incomplete, "LOCALSTACK_ENDPOINT" -> s"http://$localstackAlias:$localstackPort" ), fileSystemBind = Seq( @@ -225,7 +226,7 @@ object Containers extends CatsEffect { region: String, streams: KinesisConfig.Streams ): Unit = - List(streams.raw, streams.enriched, streams.bad).foreach { stream => + List(streams.raw, streams.enriched, streams.bad, streams.incomplete).foreach { stream => localstack.execInContainer( "aws", s"--endpoint-url=http://127.0.0.1:$port", diff --git a/modules/kinesis/src/it/scala/com/snowplowanalytics/snowplow/enrich/kinesis/EnrichKinesisSpec.scala b/modules/kinesis/src/it/scala/com/snowplowanalytics/snowplow/enrich/kinesis/EnrichKinesisSpec.scala index 67fca4aca..46306f4d2 100644 --- a/modules/kinesis/src/it/scala/com/snowplowanalytics/snowplow/enrich/kinesis/EnrichKinesisSpec.scala +++ b/modules/kinesis/src/it/scala/com/snowplowanalytics/snowplow/enrich/kinesis/EnrichKinesisSpec.scala @@ -42,7 +42,7 @@ class EnrichKinesisSpec extends Specification with AfterAll with CatsEffect { } } - "emit the correct number of enriched events and bad rows" in { + "emit the correct number of enriched events, bad rows and incomplete events" in { import utils._ val testName = "count" @@ -66,10 +66,11 @@ class EnrichKinesisSpec extends Specification with AfterAll with CatsEffect { resources.use { enrich => for { output <- enrich(input).compile.toList - (good, bad) = parseOutput(output, testName) + (good, bad, incomplete) = parseOutput(output, testName) } yield { good.size.toLong must beEqualTo(nbGood) bad.size.toLong must beEqualTo(nbBad) + incomplete.size.toLong must beEqualTo(nbBad) } } } @@ -108,13 +109,14 @@ class EnrichKinesisSpec extends Specification with AfterAll with CatsEffect { resources.use { enrich => for { output <- enrich(input).compile.toList - (good, bad) = parseOutput(output, testName) + (good, bad, incomplete) = parseOutput(output, testName) } yield { good.size.toLong must beEqualTo(nbGood) good.map { enriched => enriched.derived_contexts.data.map(_.schema) must containTheSameElementsAs(enrichmentsContexts) } bad.size.toLong must beEqualTo(0l) + incomplete.size.toLong must beEqualTo(0l) } } } diff --git a/modules/kinesis/src/it/scala/com/snowplowanalytics/snowplow/enrich/kinesis/KinesisConfig.scala b/modules/kinesis/src/it/scala/com/snowplowanalytics/snowplow/enrich/kinesis/KinesisConfig.scala index cf65222a7..91d279af9 100644 --- a/modules/kinesis/src/it/scala/com/snowplowanalytics/snowplow/enrich/kinesis/KinesisConfig.scala +++ b/modules/kinesis/src/it/scala/com/snowplowanalytics/snowplow/enrich/kinesis/KinesisConfig.scala @@ -59,6 +59,9 @@ object KinesisConfig { Some(URI.create(getEndpoint(localstackPort))) ) + def incompleteStreamConfig(localstackPort: Int, streamName: String) = + enrichedStreamConfig(localstackPort, streamName) + val monitoring = Monitoring( None, MetricsReporters(None, None, false) @@ -67,8 +70,8 @@ object KinesisConfig { private def getEndpoint(localstackPort: Int): String = s"http://$endpoint:$localstackPort" - case class Streams(raw: String, enriched: String, bad: String) + case class Streams(raw: String, enriched: String, bad: String, incomplete: String) def getStreams(uuid: String): Streams = - Streams(s"raw-$uuid", s"enriched-$uuid", s"bad-1-$uuid") + Streams(s"raw-$uuid", s"enriched-$uuid", s"bad-1-$uuid", s"incomplete-$uuid") } diff --git a/modules/kinesis/src/it/scala/com/snowplowanalytics/snowplow/enrich/kinesis/utils.scala b/modules/kinesis/src/it/scala/com/snowplowanalytics/snowplow/enrich/kinesis/utils.scala index ad3b63508..c6c169d67 100644 --- a/modules/kinesis/src/it/scala/com/snowplowanalytics/snowplow/enrich/kinesis/utils.scala +++ b/modules/kinesis/src/it/scala/com/snowplowanalytics/snowplow/enrich/kinesis/utils.scala @@ -34,6 +34,7 @@ object utils extends CatsEffect { object OutputRow { final case class Good(event: Event) extends OutputRow final case class Bad(badRow: BadRow) extends OutputRow + final case class Incomplete(incomplete: Event) extends OutputRow } def mkEnrichPipe( @@ -46,9 +47,12 @@ object utils extends CatsEffect { } yield { val enriched = asGood(outputStream(KinesisConfig.enrichedStreamConfig(localstackPort, streams.enriched))) val bad = asBad(outputStream(KinesisConfig.badStreamConfig(localstackPort, streams.bad))) + val incomplete = asIncomplete(outputStream(KinesisConfig.incompleteStreamConfig(localstackPort, streams.incomplete))) collectorPayloads => - enriched.merge(bad) + enriched + .merge(bad) + .merge(incomplete) .interruptAfter(3.minutes) .concurrently(collectorPayloads.evalMap(bytes => rawSink(List(bytes)))) } @@ -81,13 +85,26 @@ object utils extends CatsEffect { } } - def parseOutput(output: List[OutputRow], testName: String): (List[Event], List[BadRow]) = { + private def asIncomplete(source: Stream[IO, Array[Byte]]): Stream[IO, OutputRow.Incomplete] = + source.map { bytes => + OutputRow.Incomplete { + val s = new String(bytes) + Event.parse(s) match { + case Validated.Valid(e) => e + case Validated.Invalid(e) => + throw new RuntimeException(s"Can't parse incomplete event [$s]. Error: $e") + } + } + } + + def parseOutput(output: List[OutputRow], testName: String): (List[Event], List[BadRow], List[Event]) = { val good = output.collect { case OutputRow.Good(e) => e} println(s"[$testName] Bad rows:") val bad = output.collect { case OutputRow.Bad(b) => println(s"[$testName] ${b.compact}") b } - (good, bad) + val incomplete = output.collect { case OutputRow.Incomplete(i) => i} + (good, bad, incomplete) } }