Skip to content

Commit

Permalink
Add integration test for incomplete sink
Browse files Browse the repository at this point in the history
  • Loading branch information
benjben committed Feb 20, 2024
1 parent 140ff91 commit 547408c
Show file tree
Hide file tree
Showing 6 changed files with 43 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,10 @@ object EnrichmentManager {
val iorT = for {
_ <- IorT.fromIor[F](setupEnrichedEvent(raw, enrichedEvent, etlTstamp, processor))
extract <- IgluUtils.extractAndValidateInputJsons(enrichedEvent, client, registryLookup)
_ = {
enrichedEvent.contexts = ME.formatContexts(extract.contexts).getOrElse("")
enrichedEvent.unstruct_event = ME.formatUnstructEvent(extract.unstructEvent).getOrElse("")
}
} yield extract

iorT.leftMap { violations =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,13 @@
"region": ${REGION}
"customEndpoint": ${LOCALSTACK_ENDPOINT}
}

"incomplete": {
"type": "Kinesis"
"streamName": ${STREAM_INCOMPLETE}
"region": ${REGION}
"customEndpoint": ${LOCALSTACK_ENDPOINT}
}
}

"monitoring": {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ object Containers extends CatsEffect {
"STREAM_RAW" -> streams.raw,
"STREAM_ENRICHED" -> streams.enriched,
"STREAM_BAD" -> streams.bad,
"STREAM_INCOMPLETE" -> streams.incomplete,
"LOCALSTACK_ENDPOINT" -> s"http://$localstackAlias:$localstackPort"
),
fileSystemBind = Seq(
Expand Down Expand Up @@ -225,7 +226,7 @@ object Containers extends CatsEffect {
region: String,
streams: KinesisConfig.Streams
): Unit =
List(streams.raw, streams.enriched, streams.bad).foreach { stream =>
List(streams.raw, streams.enriched, streams.bad, streams.incomplete).foreach { stream =>
localstack.execInContainer(
"aws",
s"--endpoint-url=http://127.0.0.1:$port",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ class EnrichKinesisSpec extends Specification with AfterAll with CatsEffect {
}
}

"emit the correct number of enriched events and bad rows" in {
"emit the correct number of enriched events, bad rows and incomplete events" in {
import utils._

val testName = "count"
Expand All @@ -66,10 +66,11 @@ class EnrichKinesisSpec extends Specification with AfterAll with CatsEffect {
resources.use { enrich =>
for {
output <- enrich(input).compile.toList
(good, bad) = parseOutput(output, testName)
(good, bad, incomplete) = parseOutput(output, testName)
} yield {
good.size.toLong must beEqualTo(nbGood)
bad.size.toLong must beEqualTo(nbBad)
incomplete.size.toLong must beEqualTo(nbBad)
}
}
}
Expand Down Expand Up @@ -108,13 +109,14 @@ class EnrichKinesisSpec extends Specification with AfterAll with CatsEffect {
resources.use { enrich =>
for {
output <- enrich(input).compile.toList
(good, bad) = parseOutput(output, testName)
(good, bad, incomplete) = parseOutput(output, testName)
} yield {
good.size.toLong must beEqualTo(nbGood)
good.map { enriched =>
enriched.derived_contexts.data.map(_.schema) must containTheSameElementsAs(enrichmentsContexts)
}
bad.size.toLong must beEqualTo(0l)
incomplete.size.toLong must beEqualTo(0l)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,9 @@ object KinesisConfig {
Some(URI.create(getEndpoint(localstackPort)))
)

def incompleteStreamConfig(localstackPort: Int, streamName: String) =
enrichedStreamConfig(localstackPort, streamName)

val monitoring = Monitoring(
None,
MetricsReporters(None, None, false)
Expand All @@ -67,8 +70,8 @@ object KinesisConfig {
private def getEndpoint(localstackPort: Int): String =
s"http://$endpoint:$localstackPort"

case class Streams(raw: String, enriched: String, bad: String)
case class Streams(raw: String, enriched: String, bad: String, incomplete: String)

def getStreams(uuid: String): Streams =
Streams(s"raw-$uuid", s"enriched-$uuid", s"bad-1-$uuid")
Streams(s"raw-$uuid", s"enriched-$uuid", s"bad-1-$uuid", s"incomplete-$uuid")
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ object utils extends CatsEffect {
object OutputRow {
final case class Good(event: Event) extends OutputRow
final case class Bad(badRow: BadRow) extends OutputRow
final case class Incomplete(incomplete: Event) extends OutputRow
}

def mkEnrichPipe(
Expand All @@ -46,9 +47,12 @@ object utils extends CatsEffect {
} yield {
val enriched = asGood(outputStream(KinesisConfig.enrichedStreamConfig(localstackPort, streams.enriched)))
val bad = asBad(outputStream(KinesisConfig.badStreamConfig(localstackPort, streams.bad)))
val incomplete = asIncomplete(outputStream(KinesisConfig.incompleteStreamConfig(localstackPort, streams.incomplete)))

collectorPayloads =>
enriched.merge(bad)
enriched
.merge(bad)
.merge(incomplete)
.interruptAfter(3.minutes)
.concurrently(collectorPayloads.evalMap(bytes => rawSink(List(bytes))))
}
Expand Down Expand Up @@ -81,13 +85,26 @@ object utils extends CatsEffect {
}
}

def parseOutput(output: List[OutputRow], testName: String): (List[Event], List[BadRow]) = {
private def asIncomplete(source: Stream[IO, Array[Byte]]): Stream[IO, OutputRow.Incomplete] =
source.map { bytes =>
OutputRow.Incomplete {
val s = new String(bytes)
Event.parse(s) match {
case Validated.Valid(e) => e
case Validated.Invalid(e) =>
throw new RuntimeException(s"Can't parse incomplete event [$s]. Error: $e")
}
}
}

def parseOutput(output: List[OutputRow], testName: String): (List[Event], List[BadRow], List[Event]) = {
val good = output.collect { case OutputRow.Good(e) => e}
println(s"[$testName] Bad rows:")
val bad = output.collect { case OutputRow.Bad(b) =>
println(s"[$testName] ${b.compact}")
b
}
(good, bad)
val incomplete = output.collect { case OutputRow.Incomplete(i) => i}
(good, bad, incomplete)
}
}

0 comments on commit 547408c

Please sign in to comment.