Skip to content

Commit

Permalink
Add configuration for incomplete events sink
Browse files Browse the repository at this point in the history
  • Loading branch information
benjben committed Feb 19, 2024
1 parent a9d6f1b commit da450e8
Show file tree
Hide file tree
Showing 27 changed files with 292 additions and 34 deletions.
8 changes: 8 additions & 0 deletions config/config.file.extended.hocon
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,14 @@
"file": "/var/bad"
"maxBytes": 1000000
}

# Incomplete events output
"incomplete": {
# Local FS supported for testing purposes
"type": "FileSystem"
"file": "/var/incomplete"
"maxBytes": 1000000
}
}

# Optional. Concurrency of the app
Expand Down
19 changes: 19 additions & 0 deletions config/config.kafka.extended.hocon
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,25 @@
"acks": "all"
}
}

# Optional. Incomplete events output.
# If set, an incomplete enriched event holding the errors in derived_context will get emitted on top of a bad row
"incomplete": {
"type": "Kafka"

# Name of the Kafka topic to write to
"topicName": "incomplete"

# A list of host:port pairs to use for establishing the initial connection to the Kafka cluster
# This list should be in the form host1:port1,host2:port2,...
"bootstrapServers": "localhost:9092"

# Optional, Kafka producer configuration
# See https://kafka.apache.org/documentation/#producerconfigs for all properties
"producerConf": {
"acks": "all"
}
}
}

# Optional. Concurrency of the app
Expand Down
49 changes: 46 additions & 3 deletions config/config.kinesis.extended.hocon
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@
# Otherwise, the partition key will be a random UUID.
# "partitionKey": "user_id"

# Optional. Policy to retry if writing to kinesis fails with unexepected errors
# Optional. Policy to retry if writing to kinesis fails with unexpected errors
"backoffPolicy": {
"minBackoff": 100 milliseconds
"maxBackoff": 10 seconds
Expand Down Expand Up @@ -144,7 +144,7 @@
# Otherwise, the partition key will be a random UUID.
# "partitionKey": "user_id"

# Optional. Policy to retry if writing to kinesis fails with unexepcted errors
# Optional. Policy to retry if writing to kinesis fails with unexpected errors
"backoffPolicy": {
"minBackoff": 100 milliseconds
"maxBackoff": 10 seconds
Expand Down Expand Up @@ -186,7 +186,50 @@
# https://sdk.amazonaws.com/java/api/latest/software/amazon/awssdk/regions/providers/DefaultAwsRegionProviderChain.html
"region": "eu-central-1"

# Optional. Policy to retry if writing to kinesis fails with unexepcted errors
# Optional. Policy to retry if writing to kinesis fails with unexpected errors
"backoffPolicy": {
"minBackoff": 100 milliseconds
"maxBackoff": 10 seconds
"maxRetries": 10
}

# Optional. Policy to retry if writing to kinesis exceeds the provisioned throughput.
"throttledBackoffPolicy": {
"minBackoff": 100 milliseconds
"maxBackoff": 1 second
}

# Optional. Limits the number of events in a single PutRecords request.
# Several requests are made in parallel
# Maximum allowed: 500
"recordLimit": 500

# Optional. Limits the number of bytes in a single PutRecords request,
# including records and partition keys.
# Several requests are made in parallel
# Maximum allowed: 5 MB
"byteLimit": 5242880

# Optional. Use a custom Kinesis endpoint.
# Can be used for instance to work locally with localstack
# "customEndpoint": "https://localhost:4566"
}

# Optional. Incomplete events output.
# If set, an incomplete enriched event holding the errors in derived_context will get emitted on top of a bad row
"incomplete": {
"type": "Kinesis"

# Name of the Kinesis stream to write to
"streamName": "incomplete"

# Optional. Region where the Kinesis stream is located
# This field is optional if it can be resolved with AWS region provider chain.
# It checks places like env variables, system properties, AWS profile file.
# https://sdk.amazonaws.com/java/api/latest/software/amazon/awssdk/regions/providers/DefaultAwsRegionProviderChain.html
"region": "eu-central-1"

# Optional. Policy to retry if writing to kinesis fails with unexpected errors
"backoffPolicy": {
"minBackoff": 100 milliseconds
"maxBackoff": 10 seconds
Expand Down
21 changes: 21 additions & 0 deletions config/config.nsq.extended.hocon
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,27 @@
"maxRetries": 10
}
}

# Incomplete events output
"incomplete": {
"type": "Nsq"

# Name of the NSQ topic that will receive the incomplete events
"topic": "incomplete"

# The host name of nsqd application
"nsqdHost": "127.0.0.1"

# The port number of nsqd application
"nsqdPort": 4150

# Optional. Policy to retry if writing to NSQ fails
"backoffPolicy": {
"minBackoff": 100 milliseconds
"maxBackoff": 10 seconds
"maxRetries": 10
}
}
}

# Optional. Concurrency of the app
Expand Down
25 changes: 25 additions & 0 deletions config/config.pubsub.extended.hocon
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,31 @@
# Note the PubSub maximum is 10MB
"maxBatchBytes": 8000000
}

# Optional. Incomplete events output.
# If set, an incomplete enriched event holding the errors in derived_context will get emitted on top of a bad row
"incomplete": {
"type": "PubSub"

# Name of the PubSub topic that will receive the incomplete events
"topic": "projects/test-project/topics/incomplete"

# Optional. Delay threshold to use for batching.
# After this amount of time has elapsed,
# before maxBatchSize and maxBatchBytes have been reached,
# messages from the buffer will be sent.
"delayThreshold": 200 milliseconds

# Optional. Maximum number of messages sent within a batch.
# When the buffer reaches this number of messages they are sent.
# PubSub maximum : 1000
"maxBatchSize": 1000

# Optional. Maximum number of bytes sent within a batch.
# When the buffer reaches this size messages are sent.
# Note the PubSub maximum is 10MB
"maxBatchBytes": 8000000
}
}

# Optional. Concurrency of the app
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,8 @@ object Enrich {
env.featureFlags,
env.metrics.invalidCount,
env.registryLookup,
env.atomicFields
env.atomicFields,
env.sinkIncomplete.isDefined
)

val enriched =
Expand Down Expand Up @@ -119,7 +120,8 @@ object Enrich {
featureFlags: FeatureFlags,
invalidCount: F[Unit],
registryLookup: RegistryLookup[F],
atomicFields: AtomicFields
atomicFields: AtomicFields,
emitIncomplete: Boolean
)(
row: Array[Byte]
): F[Result] = {
Expand All @@ -141,7 +143,7 @@ object Enrich {
invalidCount,
registryLookup,
atomicFields,
emitIncomplete = true
emitIncomplete
)
} yield (enriched, collectorTstamp)

Expand Down Expand Up @@ -190,8 +192,7 @@ object Enrich {
chunk: List[Result],
env: Environment[F, A]
): F[Unit] = {
//val (bad, enriched, incomplete) =
val (bad, enriched, _) =
val (bad, enriched, incomplete) =
chunk
.flatMap(_._1)
.foldLeft((List.empty[BadRow], List.empty[EnrichedEvent], List.empty[EnrichedEvent])) {
Expand All @@ -209,6 +210,11 @@ object Enrich {
.map(bytes => (e, AttributedData(bytes, env.goodPartitionKey(e), env.goodAttributes(e))))
}.separate

val (incompleteTooBig, incompleteBytes) = incomplete.map { e =>
serializeEnriched(e, env.processor, env.streamsSettings.maxRecordSize)
.map(bytes => AttributedData(bytes, env.goodPartitionKey(e), env.goodAttributes(e)))
}.separate

val allBad = (bad ++ moreBad).map(badRowResize(env, _))

List(
Expand All @@ -223,7 +229,10 @@ object Enrich {
env.processor,
env.streamsSettings.maxRecordSize
) *> env.metrics.enrichLatency(chunk.headOption.flatMap(_._2)),
sinkBad(allBad, env.sinkBad, env.metrics.badCount)
sinkBad(allBad, env.sinkBad, env.metrics.badCount),
if (incompleteTooBig.nonEmpty) Logger[F].warn(s"${incompleteTooBig.size} incomplete events discarded because they are too big")
else Sync[F].unit,
sinkIncomplete(incompleteBytes, env.sinkIncomplete)
).parSequence_
}

Expand Down Expand Up @@ -281,6 +290,15 @@ object Enrich {
Sync[F].unit
}

def sinkIncomplete[F[_]: Sync](
incomplete: List[AttributedData[Array[Byte]]],
maybeSink: Option[AttributedByteSink[F]]
): F[Unit] =
maybeSink match {
case Some(sink) => sink(incomplete)
case None => Sync[F].unit
}

def serializeEnriched(
enriched: EnrichedEvent,
processor: Processor,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ import com.snowplowanalytics.snowplow.enrich.common.fs2.io.experimental.Metadata
* @param sinkGood function that sinks enriched event
* @param sinkPii function that sinks pii event
* @param sinkBad function that sinks an event that failed validation or enrichment
* @param sinkIncomplete function that sinks incomplete events
* @param checkpoint function that checkpoints input stream records
* @param getPayload function that extracts the collector payload bytes from a record
* @param sentry optional sentry client
Expand Down Expand Up @@ -111,6 +112,7 @@ final case class Environment[F[_], A](
sinkGood: AttributedByteSink[F],
sinkPii: Option[AttributedByteSink[F]],
sinkBad: ByteSink[F],
sinkIncomplete: Option[AttributedByteSink[F]],
checkpoint: List[A] => F[Unit],
getPayload: A => Array[Byte],
sentry: Option[SentryClient],
Expand Down Expand Up @@ -187,6 +189,7 @@ object Environment {
sinkGood: Resource[F, AttributedByteSink[F]],
sinkPii: Option[Resource[F, AttributedByteSink[F]]],
sinkBad: Resource[F, ByteSink[F]],
sinkIncomplete: Option[Resource[F, AttributedByteSink[F]]],
clients: Resource[F, List[Client[F]]],
checkpoint: List[A] => F[Unit],
getPayload: A => Array[Byte],
Expand All @@ -204,6 +207,7 @@ object Environment {
good <- sinkGood
bad <- sinkBad
pii <- sinkPii.sequence
incomplete <- sinkIncomplete.sequence
http4s <- Clients.mkHttp()
clts <- clients.map(Clients.init(http4s, _))
igluClient <- IgluCirceClient.parseDefault[F](parsedConfigs.igluJson).resource
Expand Down Expand Up @@ -231,6 +235,7 @@ object Environment {
good,
pii,
bad,
incomplete,
checkpoint,
getPayload,
sentry,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ object Run {
mkSinkGood: Output => Resource[F, AttributedByteSink[F]],
mkSinkPii: Output => Resource[F, AttributedByteSink[F]],
mkSinkBad: Output => Resource[F, ByteSink[F]],
mkSinkIncomplete: Output => Resource[F, AttributedByteSink[F]],
checkpoint: List[A] => F[Unit],
mkClients: BlobStorageClients => List[Resource[F, Client[F]]],
getPayload: A => Array[Byte],
Expand Down Expand Up @@ -89,6 +90,7 @@ object Run {
case _ =>
mkSinkBad(file.output.bad)
}
sinkIncomplete = file.output.incomplete.map(out => initAttributedSink(out, mkSinkIncomplete))
clients = mkClients(file.blobStorage).sequence
exit <- file.input match {
case p: Input.FileSystem =>
Expand All @@ -100,6 +102,7 @@ object Run {
sinkGood,
sinkPii,
sinkBad,
sinkIncomplete,
clients,
_ => Sync[F].unit,
identity,
Expand Down Expand Up @@ -130,6 +133,7 @@ object Run {
sinkGood,
sinkPii,
sinkBad,
sinkIncomplete,
clients,
checkpointing,
getPayload,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,20 +59,23 @@ object ConfigFile {

implicit val configFileDecoder: Decoder[ConfigFile] =
deriveConfiguredDecoder[ConfigFile].emap {
case ConfigFile(_, _, _, Some(aup), _, _, _, _, _, _, _, _, _) if aup._1 <= 0L =>
case c: ConfigFile if c.assetsUpdatePeriod.exists(_.length <= 0L) =>
"assetsUpdatePeriod in config file cannot be less than 0".asLeft // TODO: use newtype
// Remove pii output if streamName and region empty
case c @ ConfigFile(_, Outputs(good, Some(output: Output.Kinesis), bad), _, _, _, _, _, _, _, _, _, _, _)
if output.streamName.isEmpty =>
c.copy(output = Outputs(good, None, bad)).asRight
// Remove pii output if topic empty
case c @ ConfigFile(_, Outputs(good, Some(Output.PubSub(t, _, _, _, _, _)), bad), _, _, _, _, _, _, _, _, _, _, _) if t.isEmpty =>
c.copy(output = Outputs(good, None, bad)).asRight
// Remove pii output if topic empty
case c @ ConfigFile(_, Outputs(good, Some(Output.Kafka(topicName, _, _, _, _)), bad), _, _, _, _, _, _, _, _, _, _, _)
if topicName.isEmpty =>
c.copy(output = Outputs(good, None, bad)).asRight
case other => other.asRight
case c: ConfigFile =>
val Outputs(good, pii, bad, incomplete) = c.output
val piiCleaned = pii match {
case Some(ki: Output.Kinesis) if ki.streamName.isEmpty => None
case Some(p: Output.PubSub) if p.topic.isEmpty => None
case Some(ka: Output.Kafka) if ka.topicName.isEmpty => None
case _ => pii
}
val incompleteCleaned = incomplete match {
case Some(ki: Output.Kinesis) if ki.streamName.isEmpty => None
case Some(p: Output.PubSub) if p.topic.isEmpty => None
case Some(ka: Output.Kafka) if ka.topicName.isEmpty => None
case _ => incomplete
}
c.copy(output = Outputs(good, piiCleaned, bad, incompleteCleaned)).asRight
}

/* Defines where to look for default values if they are not in the provided file
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,8 @@ object io {
case class Outputs(
good: Output,
pii: Option[Output],
bad: Output
bad: Output,
incomplete: Option[Output]
)
object Outputs {
implicit val outputsDecoder: Decoder[Outputs] = deriveConfiguredDecoder[Outputs]
Expand Down
Loading

0 comments on commit da450e8

Please sign in to comment.