From 7172f4ae34e54397032880b06c761642d6feac66 Mon Sep 17 00:00:00 2001 From: Benjamin Benoist Date: Thu, 21 Nov 2024 21:30:18 +0100 Subject: [PATCH 1/3] Validate atomic fields lengths and JSON depth with Enrich defaults --- project/Dependencies.scala | 2 +- .../Configuration.scala | 47 +++++++++++++------ .../MemorySink.scala | 14 ++++-- .../Run.scala | 4 +- .../MemorySinkSpec.scala | 7 ++- 5 files changed, 48 insertions(+), 26 deletions(-) diff --git a/project/Dependencies.scala b/project/Dependencies.scala index bcfcb6d..e63f8d9 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -17,7 +17,7 @@ object Dependencies { object V { // Snowplow val snowplowStreamCollector = "3.2.0" - val snowplowCommonEnrich = "4.2.0" + val snowplowCommonEnrich = "5.1.4" val http4sCirce = "0.23.23" val decline = "2.4.1" diff --git a/src/main/scala/com.snowplowanalytics.snowplow.micro/Configuration.scala b/src/main/scala/com.snowplowanalytics.snowplow.micro/Configuration.scala index 090ed34..5f81ae5 100644 --- a/src/main/scala/com.snowplowanalytics.snowplow.micro/Configuration.scala +++ b/src/main/scala/com.snowplowanalytics.snowplow.micro/Configuration.scala @@ -22,7 +22,7 @@ import com.snowplowanalytics.iglu.core.circe.CirceIgluCodecs._ import com.snowplowanalytics.iglu.core.{SchemaKey, SchemaVer, SelfDescribingData} import com.snowplowanalytics.snowplow.collector.core.{Config => CollectorConfig} import com.snowplowanalytics.snowplow.enrich.common.adapters.{CallrailSchemas, CloudfrontAccessLogSchemas, GoogleAnalyticsSchemas, HubspotSchemas, MailchimpSchemas, MailgunSchemas, MandrillSchemas, MarketoSchemas, OlarkSchemas, PagerdutySchemas, PingdomSchemas, SendgridSchemas, StatusGatorSchemas, UnbounceSchemas, UrbanAirshipSchemas, VeroSchemas, AdaptersSchemas => EnrichAdaptersSchemas} -import com.snowplowanalytics.snowplow.enrich.common.enrichments.EnrichmentRegistry +import com.snowplowanalytics.snowplow.enrich.common.enrichments.{AtomicFields, EnrichmentRegistry} import com.snowplowanalytics.snowplow.enrich.common.enrichments.registry.EnrichmentConf import com.typesafe.config.{ConfigFactory, ConfigParseOptions, Config => TypesafeConfig} import fs2.io.file.{Files, Path => FS2Path} @@ -63,10 +63,15 @@ object Configuration { final case class MicroConfig(collector: CollectorConfig[SinkConfig], iglu: IgluResources, enrichmentsConfig: List[EnrichmentConf], - adaptersSchemas: AdaptersSchemas, + enrichConfig: EnrichConfig, outputEnrichedTsv: Boolean) - final case class AdaptersSchemas(adaptersSchemas: EnrichAdaptersSchemas) + final case class EnrichValidation(atomicFieldsLimits: AtomicFields) + final case class EnrichConfig( + adaptersSchemas: EnrichAdaptersSchemas, + maxJsonDepth: Int, + validation: EnrichValidation + ) final case class IgluResources(resolver: Resolver[IO], client: IgluCirceClient[IO]) @@ -76,10 +81,10 @@ object Configuration { Cli.config.map { cliConfig => for { collectorConfig <- loadCollectorConfig(cliConfig.collector) - igluResources <- loadIgluResources(cliConfig.iglu) + enrichConfig <- loadEnrichConfig() + igluResources <- loadIgluResources(cliConfig.iglu, enrichConfig.maxJsonDepth) enrichmentsConfig <- loadEnrichmentConfig(igluResources.client) - adaptersSchemas <- loadAdaptersSchemas() - } yield MicroConfig(collectorConfig, igluResources, enrichmentsConfig, adaptersSchemas, cliConfig.outputEnrichedTsv) + } yield MicroConfig(collectorConfig, igluResources, enrichmentsConfig, enrichConfig, cliConfig.outputEnrichedTsv) } } @@ -90,12 +95,12 @@ object Configuration { loadConfig[CollectorConfig[SinkConfig]](path, resolveOrder) } - private def loadIgluResources(path: Option[Path]): EitherT[IO, String, IgluResources] = { + private def loadIgluResources(path: Option[Path], maxJsonDepth: Int): EitherT[IO, String, IgluResources] = { val resolveOrder = (config: TypesafeConfig) => config.withFallback(ConfigFactory.parseResources("default-iglu-resolver.conf")) loadConfig[ResolverConfig](path, resolveOrder) - .flatMap(buildIgluResources) + .flatMap(resolverConfig => buildIgluResources(resolverConfig, maxJsonDepth)) } private def loadEnrichmentConfig(igluClient: IgluCirceClient[IO]): EitherT[IO, String, List[EnrichmentConf]] = { @@ -112,18 +117,18 @@ object Configuration { } } - private def loadAdaptersSchemas(): EitherT[IO, String, AdaptersSchemas] = { + def loadEnrichConfig(): EitherT[IO, String, EnrichConfig] = { val resolveOrder = (config: TypesafeConfig) => ConfigFactory.load(config) //It's not configurable in micro, we load it from reference.conf provided by enrich - loadConfig[AdaptersSchemas](path = None, resolveOrder) + loadConfig[EnrichConfig](path = None, resolveOrder) } - private def buildIgluResources(resolverConfig: ResolverConfig): EitherT[IO, String, IgluResources] = + private def buildIgluResources(resolverConfig: ResolverConfig, maxJsonDepth: Int): EitherT[IO, String, IgluResources] = for { resolver <- Resolver.fromConfig[IO](resolverConfig).leftMap(_.show) completeResolver = resolver.copy(repos = resolver.repos ++ readIgluExtraRegistry()) - client <- EitherT.liftF(IgluCirceClient.fromResolver[IO](completeResolver, resolverConfig.cacheSize)) + client <- EitherT.liftF(IgluCirceClient.fromResolver[IO](completeResolver, resolverConfig.cacheSize, maxJsonDepth)) } yield IgluResources(completeResolver, client) private def loadEnrichmentsAsSDD(enrichmentsDirectory: Path, @@ -226,8 +231,8 @@ object Configuration { implicit val resolverDecoder: Decoder[ResolverConfig] = Decoder.decodeJson.emap(json => Resolver.parseConfig(json).leftMap(_.show)) - implicit val adaptersSchemasDecoder: Decoder[AdaptersSchemas] = - deriveDecoder[AdaptersSchemas] + implicit val enrichConfigDecoder: Decoder[EnrichConfig] = + deriveDecoder[EnrichConfig] implicit val enrichAdaptersSchemasDecoder: Decoder[EnrichAdaptersSchemas] = deriveDecoder[EnrichAdaptersSchemas] implicit val callrailSchemasDecoder: Decoder[CallrailSchemas] = @@ -263,4 +268,18 @@ object Configuration { implicit val veroSchemasDecoder: Decoder[VeroSchemas] = deriveDecoder[VeroSchemas] + implicit val validationDecoder: Decoder[EnrichValidation] = + deriveDecoder[EnrichValidation] + implicit val atomicFieldsDecoder: Decoder[AtomicFields] = Decoder[Map[String, Int]].emap { fieldsLimits => + val configuredFields = fieldsLimits.keys.toList + val supportedFields = AtomicFields.supportedFields.map(_.name) + val unsupportedFields = configuredFields.diff(supportedFields) + + if (unsupportedFields.nonEmpty) + Left(s""" + |Configured atomic fields: ${unsupportedFields.mkString("[", ",", "]")} are not supported. + |Supported fields: ${supportedFields.mkString("[", ",", "]")}""".stripMargin) + else + Right(AtomicFields.from(fieldsLimits)) + } } diff --git a/src/main/scala/com.snowplowanalytics.snowplow.micro/MemorySink.scala b/src/main/scala/com.snowplowanalytics.snowplow.micro/MemorySink.scala index a0506a9..4311e06 100644 --- a/src/main/scala/com.snowplowanalytics.snowplow.micro/MemorySink.scala +++ b/src/main/scala/com.snowplowanalytics.snowplow.micro/MemorySink.scala @@ -21,12 +21,13 @@ import com.snowplowanalytics.snowplow.badrows.{BadRow, Failure, Payload, Process import com.snowplowanalytics.snowplow.collector.core.Sink import com.snowplowanalytics.snowplow.enrich.common.EtlPipeline import com.snowplowanalytics.snowplow.enrich.common.adapters.{AdapterRegistry, RawEvent} -import com.snowplowanalytics.snowplow.enrich.common.enrichments.{AtomicFields, EnrichmentManager, EnrichmentRegistry} +import com.snowplowanalytics.snowplow.enrich.common.enrichments.{EnrichmentManager, EnrichmentRegistry} import com.snowplowanalytics.snowplow.enrich.common.loaders.ThriftLoader import com.snowplowanalytics.snowplow.enrich.common.utils.ConversionUtils import io.circe.syntax._ import org.joda.time.DateTime import org.slf4j.LoggerFactory +import com.snowplowanalytics.snowplow.micro.Configuration.EnrichConfig /** Sink of the collector that Snowplow Micro is. * Contains the functions that are called for each tracking event sent @@ -40,10 +41,12 @@ final class MemorySink(igluClient: IgluCirceClient[IO], enrichmentRegistry: EnrichmentRegistry[IO], outputEnrichedTsv: Boolean, processor: Processor, - adapterRegistry: AdapterRegistry[IO]) extends Sink[IO] { + enrichConfig: EnrichConfig) extends Sink[IO] { override val maxBytes = Int.MaxValue private lazy val logger = LoggerFactory.getLogger("EventLog") + private val adapterRegistry = new AdapterRegistry[IO](Map.empty, enrichConfig.adaptersSchemas) + override def isHealthy: IO[Boolean] = IO.pure(true) override def storeRawEvents(events: List[Array[Byte]], key: String): IO[Unit] = { @@ -75,7 +78,7 @@ final class MemorySink(igluClient: IgluCirceClient[IO], case Validated.Valid(maybePayload) => maybePayload match { case Some(collectorPayload) => - adapterRegistry.toRawEvents(collectorPayload, igluClient, processor, registryLookup).flatMap { + adapterRegistry.toRawEvents(collectorPayload, igluClient, processor, registryLookup, enrichConfig.maxJsonDepth).flatMap { case Validated.Valid(rawEvents) => val partitionEvents = rawEvents.toList.foldLeftM((Nil, Nil): (List[GoodEvent], List[BadEvent])) { case ((good, bad), rawEvent) => @@ -135,8 +138,11 @@ final class MemorySink(igluClient: IgluCirceClient[IO], EtlPipeline.FeatureFlags(acceptInvalid = false, legacyEnrichmentOrder = false), IO.unit, registryLookup, - AtomicFields.from(Map.empty) + enrichConfig.validation.atomicFieldsLimits, + emitIncomplete = false, + enrichConfig.maxJsonDepth ) + .toEither .subflatMap { enriched => EventConverter.fromEnriched(enriched) .leftMap { failure => diff --git a/src/main/scala/com.snowplowanalytics.snowplow.micro/Run.scala b/src/main/scala/com.snowplowanalytics.snowplow.micro/Run.scala index 96949a4..bd46f88 100644 --- a/src/main/scala/com.snowplowanalytics.snowplow.micro/Run.scala +++ b/src/main/scala/com.snowplowanalytics.snowplow.micro/Run.scala @@ -18,7 +18,6 @@ import com.snowplowanalytics.iglu.client.resolver.registries.JavaNetRegistryLook import com.snowplowanalytics.snowplow.badrows.Processor import com.snowplowanalytics.snowplow.collector.core._ import com.snowplowanalytics.snowplow.collector.core.model.Sinks -import com.snowplowanalytics.snowplow.enrich.common.adapters.AdapterRegistry import com.snowplowanalytics.snowplow.enrich.common.enrichments.EnrichmentRegistry import com.snowplowanalytics.snowplow.enrich.common.enrichments.registry.{Enrichment, EnrichmentConf} import com.snowplowanalytics.snowplow.enrich.common.utils.{HttpClient, ShiftExecution} @@ -56,9 +55,8 @@ object Run { sslContext <- Resource.eval(setupSSLContext()) enrichmentRegistry <- buildEnrichmentRegistry(config.enrichmentsConfig) badProcessor = Processor(BuildInfo.name, BuildInfo.version) - adapterRegistry = new AdapterRegistry[IO](Map.empty, config.adaptersSchemas.adaptersSchemas) lookup = JavaNetRegistryLookup.ioLookupInstance[IO] - sink = new MemorySink(config.iglu.client, lookup, enrichmentRegistry, config.outputEnrichedTsv, badProcessor, adapterRegistry) + sink = new MemorySink(config.iglu.client, lookup, enrichmentRegistry, config.outputEnrichedTsv, badProcessor, config.enrichConfig) collectorService = new Service[IO]( config.collector, Sinks(sink, sink), diff --git a/src/test/scala/com.snowplowanalytics.snowplow.micro/MemorySinkSpec.scala b/src/test/scala/com.snowplowanalytics.snowplow.micro/MemorySinkSpec.scala index a69b4e7..9e08400 100644 --- a/src/test/scala/com.snowplowanalytics.snowplow.micro/MemorySinkSpec.scala +++ b/src/test/scala/com.snowplowanalytics.snowplow.micro/MemorySinkSpec.scala @@ -16,7 +16,6 @@ import com.snowplowanalytics.iglu.client.IgluCirceClient import com.snowplowanalytics.iglu.client.resolver.Resolver import com.snowplowanalytics.iglu.client.resolver.registries.{JavaNetRegistryLookup, Registry} import com.snowplowanalytics.snowplow.badrows.Processor -import com.snowplowanalytics.snowplow.enrich.common.adapters.AdapterRegistry import com.snowplowanalytics.snowplow.enrich.common.enrichments.EnrichmentRegistry import org.specs2.mutable.SpecificationLike @@ -162,12 +161,12 @@ class MemorySinkSpec extends CatsResource[IO, MemorySink] with SpecificationLike private def createSink(): IO[MemorySink] = { for { - igluClient <- IgluCirceClient.fromResolver[IO](Resolver(List(Registry.IgluCentral), None), 500) + enrichConfig <- Configuration.loadEnrichConfig().value.map(_.getOrElse(throw new IllegalArgumentException("Can't read defaults from Enrich config"))) + igluClient <- IgluCirceClient.fromResolver[IO](Resolver[IO](List(Registry.IgluCentral), None), 500, enrichConfig.maxJsonDepth) enrichmentRegistry = new EnrichmentRegistry[IO]() processor = Processor(BuildInfo.name, BuildInfo.version) - adapterRegistry = new AdapterRegistry[IO](Map.empty, TestAdapterRegistry.adaptersSchemas) lookup = JavaNetRegistryLookup.ioLookupInstance[IO] - } yield new MemorySink(igluClient, lookup, enrichmentRegistry, false, processor, adapterRegistry) + } yield new MemorySink(igluClient, lookup, enrichmentRegistry, false, processor, enrichConfig) } } From a5bfb7dc4d3ce7fa4e8783e41dd1b8071e6d1ee8 Mon Sep 17 00:00:00 2001 From: Benjamin Benoist Date: Tue, 26 Nov 2024 16:02:48 +0100 Subject: [PATCH 2/3] Upgrade collector to 3.2.1 --- .github/workflows/deploy.yml | 2 +- .github/workflows/test.yml | 2 +- project/Dependencies.scala | 2 +- .../MicroHttpServer.scala | 17 ++--------------- .../Run.scala | 1 - 5 files changed, 5 insertions(+), 19 deletions(-) diff --git a/.github/workflows/deploy.yml b/.github/workflows/deploy.yml index 15246c4..d2ea057 100644 --- a/.github/workflows/deploy.yml +++ b/.github/workflows/deploy.yml @@ -28,7 +28,7 @@ jobs: - name: Publish collector locally run: | - git clone --branch 3.2.0 --depth 1 https://github.com/snowplow/stream-collector.git + git clone --branch 3.2.1 --depth 1 https://github.com/snowplow/stream-collector.git cd stream-collector sbt +publishLocal diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 39aff62..7e4bc4c 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -18,7 +18,7 @@ jobs: - name: Publish collector locally run: | - git clone --branch 3.2.0 --depth 1 https://github.com/snowplow/stream-collector.git + git clone --branch 3.2.1 --depth 1 https://github.com/snowplow/stream-collector.git cd stream-collector sbt +publishLocal diff --git a/project/Dependencies.scala b/project/Dependencies.scala index e63f8d9..bbeb306 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -16,7 +16,7 @@ object Dependencies { object V { // Snowplow - val snowplowStreamCollector = "3.2.0" + val snowplowStreamCollector = "3.2.1" val snowplowCommonEnrich = "5.1.4" val http4sCirce = "0.23.23" diff --git a/src/main/scala/com.snowplowanalytics.snowplow.micro/MicroHttpServer.scala b/src/main/scala/com.snowplowanalytics.snowplow.micro/MicroHttpServer.scala index 39a1687..dca6ac5 100644 --- a/src/main/scala/com.snowplowanalytics.snowplow.micro/MicroHttpServer.scala +++ b/src/main/scala/com.snowplowanalytics.snowplow.micro/MicroHttpServer.scala @@ -19,9 +19,8 @@ import com.snowplowanalytics.snowplow.micro.Configuration.{MicroConfig, SinkConf import org.http4s.blaze.server.BlazeServerBuilder import org.http4s.headers.`Strict-Transport-Security` import org.http4s.server.Server -import org.http4s.server.middleware.{HSTS, Metrics, Timeout, Logger => LoggerMiddleware} +import org.http4s.server.middleware.{HSTS, Metrics, Timeout} import org.http4s.{HttpApp, HttpRoutes} -import org.typelevel.ci.CIString import org.typelevel.log4cats.Logger import org.typelevel.log4cats.slf4j.Slf4jLogger @@ -57,9 +56,7 @@ object MicroHttpServer { private def builder(routes: HttpRoutes[IO], config: CollectorConfig[SinkConfig]): BlazeServerBuilder[IO] = { BlazeServerBuilder[IO] - .withHttpApp( - loggerMiddleware(timeoutMiddleware(hstsMiddleware(config.hsts, routes.orNotFound), config.networking), config.debug.http) - ) + .withHttpApp(timeoutMiddleware(hstsMiddleware(config.hsts, routes.orNotFound), config.networking)) .withIdleTimeout(config.networking.idleTimeout) .withMaxConnections(config.networking.maxConnections) .withResponseHeaderTimeout(config.networking.responseHeaderTimeout) @@ -69,16 +66,6 @@ object MicroHttpServer { ) } - private def loggerMiddleware(routes: HttpApp[IO], config: CollectorConfig.Debug.Http): HttpApp[IO] = - if (config.enable) { - LoggerMiddleware.httpApp[IO]( - logHeaders = config.logHeaders, - logBody = config.logBody, - redactHeadersWhen = config.redactHeaders.map(CIString(_)).contains(_), - logAction = Some((msg: String) => Logger[IO].debug(msg)) - )(routes) - } else routes - private def timeoutMiddleware(routes: HttpApp[IO], networking: CollectorConfig.Networking): HttpApp[IO] = Timeout.httpApp[IO](timeout = networking.responseHeaderTimeout)(routes) diff --git a/src/main/scala/com.snowplowanalytics.snowplow.micro/Run.scala b/src/main/scala/com.snowplowanalytics.snowplow.micro/Run.scala index bd46f88..6204a70 100644 --- a/src/main/scala/com.snowplowanalytics.snowplow.micro/Run.scala +++ b/src/main/scala/com.snowplowanalytics.snowplow.micro/Run.scala @@ -66,7 +66,6 @@ object Run { config.collector.enableDefaultRedirect, config.collector.rootResponse.enabled, config.collector.crossDomain.enabled, - config.collector.networking.bodyReadTimeout, collectorService ).value From b8aad7002878b4082440bf77f9b4c9e198c6d90d Mon Sep 17 00:00:00 2001 From: Benjamin Benoist Date: Tue, 26 Nov 2024 16:09:07 +0100 Subject: [PATCH 3/3] Prepare for 2.1.3 release --- CHANGELOG | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/CHANGELOG b/CHANGELOG index 0607801..cad388a 100644 --- a/CHANGELOG +++ b/CHANGELOG @@ -1,3 +1,9 @@ +Version 2.1.3 (2024-11-27) +-------------------------- +Upgrade collector to 3.2.1 +Validate atomic fields lengths and JSON depth with Enrich defaults +Update instructions for local building + Version 2.1.2 (2024-08-27) -------------------------- Fix a regression with MICRO_IGLU_REGISTRY_URL