From b2c9d63885098b4ef791a289b63b15da11761aed Mon Sep 17 00:00:00 2001 From: Benjamin Benoist Date: Thu, 21 Nov 2024 21:30:18 +0100 Subject: [PATCH] Validate atomic fields lengths with Enrich defaults --- project/Dependencies.scala | 2 +- .../Configuration.scala | 34 ++++++++++++++----- .../MemorySink.scala | 7 ++-- .../Run.scala | 4 +-- 4 files changed, 34 insertions(+), 13 deletions(-) diff --git a/project/Dependencies.scala b/project/Dependencies.scala index bcfcb6d..0498166 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -17,7 +17,7 @@ object Dependencies { object V { // Snowplow val snowplowStreamCollector = "3.2.0" - val snowplowCommonEnrich = "4.2.0" + val snowplowCommonEnrich = "5.1.4-rc1" val http4sCirce = "0.23.23" val decline = "2.4.1" diff --git a/src/main/scala/com.snowplowanalytics.snowplow.micro/Configuration.scala b/src/main/scala/com.snowplowanalytics.snowplow.micro/Configuration.scala index 090ed34..3ec17ca 100644 --- a/src/main/scala/com.snowplowanalytics.snowplow.micro/Configuration.scala +++ b/src/main/scala/com.snowplowanalytics.snowplow.micro/Configuration.scala @@ -21,8 +21,8 @@ import com.snowplowanalytics.iglu.client.resolver.registries.{JavaNetRegistryLoo import com.snowplowanalytics.iglu.core.circe.CirceIgluCodecs._ import com.snowplowanalytics.iglu.core.{SchemaKey, SchemaVer, SelfDescribingData} import com.snowplowanalytics.snowplow.collector.core.{Config => CollectorConfig} -import com.snowplowanalytics.snowplow.enrich.common.adapters.{CallrailSchemas, CloudfrontAccessLogSchemas, GoogleAnalyticsSchemas, HubspotSchemas, MailchimpSchemas, MailgunSchemas, MandrillSchemas, MarketoSchemas, OlarkSchemas, PagerdutySchemas, PingdomSchemas, SendgridSchemas, StatusGatorSchemas, UnbounceSchemas, UrbanAirshipSchemas, VeroSchemas, AdaptersSchemas => EnrichAdaptersSchemas} -import com.snowplowanalytics.snowplow.enrich.common.enrichments.EnrichmentRegistry +import com.snowplowanalytics.snowplow.enrich.common.adapters.{AdaptersSchemas => EnrichAdaptersSchemas, CallrailSchemas, CloudfrontAccessLogSchemas, GoogleAnalyticsSchemas, HubspotSchemas, MailchimpSchemas, MailgunSchemas, MandrillSchemas, MarketoSchemas, OlarkSchemas, PagerdutySchemas, PingdomSchemas, SendgridSchemas, StatusGatorSchemas, UnbounceSchemas, UrbanAirshipSchemas, VeroSchemas} +import com.snowplowanalytics.snowplow.enrich.common.enrichments.{AtomicFields, EnrichmentRegistry} import com.snowplowanalytics.snowplow.enrich.common.enrichments.registry.EnrichmentConf import com.typesafe.config.{ConfigFactory, ConfigParseOptions, Config => TypesafeConfig} import fs2.io.file.{Files, Path => FS2Path} @@ -63,10 +63,14 @@ object Configuration { final case class MicroConfig(collector: CollectorConfig[SinkConfig], iglu: IgluResources, enrichmentsConfig: List[EnrichmentConf], - adaptersSchemas: AdaptersSchemas, + enrichConfig: EnrichConfig, outputEnrichedTsv: Boolean) - final case class AdaptersSchemas(adaptersSchemas: EnrichAdaptersSchemas) + case class EnrichValidation(atomicFieldsLimits: AtomicFields) + final case class EnrichConfig( + adaptersSchemas: EnrichAdaptersSchemas, + validation: EnrichValidation + ) final case class IgluResources(resolver: Resolver[IO], client: IgluCirceClient[IO]) @@ -112,11 +116,11 @@ object Configuration { } } - private def loadAdaptersSchemas(): EitherT[IO, String, AdaptersSchemas] = { + private def loadAdaptersSchemas(): EitherT[IO, String, EnrichConfig] = { val resolveOrder = (config: TypesafeConfig) => ConfigFactory.load(config) //It's not configurable in micro, we load it from reference.conf provided by enrich - loadConfig[AdaptersSchemas](path = None, resolveOrder) + loadConfig[EnrichConfig](path = None, resolveOrder) } private def buildIgluResources(resolverConfig: ResolverConfig): EitherT[IO, String, IgluResources] = @@ -226,8 +230,8 @@ object Configuration { implicit val resolverDecoder: Decoder[ResolverConfig] = Decoder.decodeJson.emap(json => Resolver.parseConfig(json).leftMap(_.show)) - implicit val adaptersSchemasDecoder: Decoder[AdaptersSchemas] = - deriveDecoder[AdaptersSchemas] + implicit val enrichConfigDecoder: Decoder[EnrichConfig] = + deriveDecoder[EnrichConfig] implicit val enrichAdaptersSchemasDecoder: Decoder[EnrichAdaptersSchemas] = deriveDecoder[EnrichAdaptersSchemas] implicit val callrailSchemasDecoder: Decoder[CallrailSchemas] = @@ -263,4 +267,18 @@ object Configuration { implicit val veroSchemasDecoder: Decoder[VeroSchemas] = deriveDecoder[VeroSchemas] + implicit val validationDecoder: Decoder[EnrichValidation] = + deriveDecoder[EnrichValidation] + implicit val atomicFieldsDecoder: Decoder[AtomicFields] = Decoder[Map[String, Int]].emap { fieldsLimits => + val configuredFields = fieldsLimits.keys.toList + val supportedFields = AtomicFields.supportedFields.map(_.name) + val unsupportedFields = configuredFields.diff(supportedFields) + + if (unsupportedFields.nonEmpty) + Left(s""" + |Configured atomic fields: ${unsupportedFields.mkString("[", ",", "]")} are not supported. + |Supported fields: ${supportedFields.mkString("[", ",", "]")}""".stripMargin) + else + Right(AtomicFields.from(fieldsLimits)) + } } diff --git a/src/main/scala/com.snowplowanalytics.snowplow.micro/MemorySink.scala b/src/main/scala/com.snowplowanalytics.snowplow.micro/MemorySink.scala index a0506a9..2e4fa47 100644 --- a/src/main/scala/com.snowplowanalytics.snowplow.micro/MemorySink.scala +++ b/src/main/scala/com.snowplowanalytics.snowplow.micro/MemorySink.scala @@ -40,7 +40,8 @@ final class MemorySink(igluClient: IgluCirceClient[IO], enrichmentRegistry: EnrichmentRegistry[IO], outputEnrichedTsv: Boolean, processor: Processor, - adapterRegistry: AdapterRegistry[IO]) extends Sink[IO] { + adapterRegistry: AdapterRegistry[IO], + atomicFieldsLengths: AtomicFields) extends Sink[IO] { override val maxBytes = Int.MaxValue private lazy val logger = LoggerFactory.getLogger("EventLog") @@ -135,8 +136,10 @@ final class MemorySink(igluClient: IgluCirceClient[IO], EtlPipeline.FeatureFlags(acceptInvalid = false, legacyEnrichmentOrder = false), IO.unit, registryLookup, - AtomicFields.from(Map.empty) + atomicFieldsLengths, + emitIncomplete = false ) + .toEither .subflatMap { enriched => EventConverter.fromEnriched(enriched) .leftMap { failure => diff --git a/src/main/scala/com.snowplowanalytics.snowplow.micro/Run.scala b/src/main/scala/com.snowplowanalytics.snowplow.micro/Run.scala index 96949a4..3a36692 100644 --- a/src/main/scala/com.snowplowanalytics.snowplow.micro/Run.scala +++ b/src/main/scala/com.snowplowanalytics.snowplow.micro/Run.scala @@ -56,9 +56,9 @@ object Run { sslContext <- Resource.eval(setupSSLContext()) enrichmentRegistry <- buildEnrichmentRegistry(config.enrichmentsConfig) badProcessor = Processor(BuildInfo.name, BuildInfo.version) - adapterRegistry = new AdapterRegistry[IO](Map.empty, config.adaptersSchemas.adaptersSchemas) + adapterRegistry = new AdapterRegistry[IO](Map.empty, config.enrichConfig.adaptersSchemas) lookup = JavaNetRegistryLookup.ioLookupInstance[IO] - sink = new MemorySink(config.iglu.client, lookup, enrichmentRegistry, config.outputEnrichedTsv, badProcessor, adapterRegistry) + sink = new MemorySink(config.iglu.client, lookup, enrichmentRegistry, config.outputEnrichedTsv, badProcessor, adapterRegistry, config.enrichConfig.validation.atomicFieldsLimits) collectorService = new Service[IO]( config.collector, Sinks(sink, sink),