Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Validate atomic fields lengths and max JSON depth with Enrich defaults #151

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/deploy.yml
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ jobs:

- name: Publish collector locally
run: |
git clone --branch 3.2.0 --depth 1 https://github.com/snowplow/stream-collector.git
git clone --branch 3.2.1 --depth 1 https://github.com/snowplow/stream-collector.git
cd stream-collector
sbt +publishLocal

Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ jobs:

- name: Publish collector locally
run: |
git clone --branch 3.2.0 --depth 1 https://github.com/snowplow/stream-collector.git
git clone --branch 3.2.1 --depth 1 https://github.com/snowplow/stream-collector.git
cd stream-collector
sbt +publishLocal

Expand Down
6 changes: 6 additions & 0 deletions CHANGELOG
Original file line number Diff line number Diff line change
@@ -1,3 +1,9 @@
Version 2.1.3 (2024-11-27)
--------------------------
Upgrade collector to 3.2.1
Validate atomic fields lengths and JSON depth with Enrich defaults
Update instructions for local building

Version 2.1.2 (2024-08-27)
--------------------------
Fix a regression with MICRO_IGLU_REGISTRY_URL
Expand Down
4 changes: 2 additions & 2 deletions project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ object Dependencies {

object V {
// Snowplow
val snowplowStreamCollector = "3.2.0"
val snowplowCommonEnrich = "4.2.0"
val snowplowStreamCollector = "3.2.1"
val snowplowCommonEnrich = "5.1.4"
val http4sCirce = "0.23.23"

val decline = "2.4.1"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import com.snowplowanalytics.iglu.core.circe.CirceIgluCodecs._
import com.snowplowanalytics.iglu.core.{SchemaKey, SchemaVer, SelfDescribingData}
import com.snowplowanalytics.snowplow.collector.core.{Config => CollectorConfig}
import com.snowplowanalytics.snowplow.enrich.common.adapters.{CallrailSchemas, CloudfrontAccessLogSchemas, GoogleAnalyticsSchemas, HubspotSchemas, MailchimpSchemas, MailgunSchemas, MandrillSchemas, MarketoSchemas, OlarkSchemas, PagerdutySchemas, PingdomSchemas, SendgridSchemas, StatusGatorSchemas, UnbounceSchemas, UrbanAirshipSchemas, VeroSchemas, AdaptersSchemas => EnrichAdaptersSchemas}
import com.snowplowanalytics.snowplow.enrich.common.enrichments.EnrichmentRegistry
import com.snowplowanalytics.snowplow.enrich.common.enrichments.{AtomicFields, EnrichmentRegistry}
import com.snowplowanalytics.snowplow.enrich.common.enrichments.registry.EnrichmentConf
import com.typesafe.config.{ConfigFactory, ConfigParseOptions, Config => TypesafeConfig}
import fs2.io.file.{Files, Path => FS2Path}
Expand Down Expand Up @@ -63,10 +63,15 @@ object Configuration {
final case class MicroConfig(collector: CollectorConfig[SinkConfig],
iglu: IgluResources,
enrichmentsConfig: List[EnrichmentConf],
adaptersSchemas: AdaptersSchemas,
enrichConfig: EnrichConfig,
outputEnrichedTsv: Boolean)

final case class AdaptersSchemas(adaptersSchemas: EnrichAdaptersSchemas)
final case class EnrichValidation(atomicFieldsLimits: AtomicFields)
final case class EnrichConfig(
adaptersSchemas: EnrichAdaptersSchemas,
maxJsonDepth: Int,
validation: EnrichValidation
)

final case class IgluResources(resolver: Resolver[IO], client: IgluCirceClient[IO])

Expand All @@ -76,10 +81,10 @@ object Configuration {
Cli.config.map { cliConfig =>
for {
collectorConfig <- loadCollectorConfig(cliConfig.collector)
igluResources <- loadIgluResources(cliConfig.iglu)
enrichConfig <- loadEnrichConfig()
igluResources <- loadIgluResources(cliConfig.iglu, enrichConfig.maxJsonDepth)
enrichmentsConfig <- loadEnrichmentConfig(igluResources.client)
adaptersSchemas <- loadAdaptersSchemas()
} yield MicroConfig(collectorConfig, igluResources, enrichmentsConfig, adaptersSchemas, cliConfig.outputEnrichedTsv)
} yield MicroConfig(collectorConfig, igluResources, enrichmentsConfig, enrichConfig, cliConfig.outputEnrichedTsv)
}
}

Expand All @@ -90,12 +95,12 @@ object Configuration {
loadConfig[CollectorConfig[SinkConfig]](path, resolveOrder)
}

private def loadIgluResources(path: Option[Path]): EitherT[IO, String, IgluResources] = {
private def loadIgluResources(path: Option[Path], maxJsonDepth: Int): EitherT[IO, String, IgluResources] = {
val resolveOrder = (config: TypesafeConfig) =>
config.withFallback(ConfigFactory.parseResources("default-iglu-resolver.conf"))

loadConfig[ResolverConfig](path, resolveOrder)
.flatMap(buildIgluResources)
.flatMap(resolverConfig => buildIgluResources(resolverConfig, maxJsonDepth))
}

private def loadEnrichmentConfig(igluClient: IgluCirceClient[IO]): EitherT[IO, String, List[EnrichmentConf]] = {
Expand All @@ -112,18 +117,18 @@ object Configuration {
}
}

private def loadAdaptersSchemas(): EitherT[IO, String, AdaptersSchemas] = {
def loadEnrichConfig(): EitherT[IO, String, EnrichConfig] = {
val resolveOrder = (config: TypesafeConfig) => ConfigFactory.load(config)

//It's not configurable in micro, we load it from reference.conf provided by enrich
loadConfig[AdaptersSchemas](path = None, resolveOrder)
loadConfig[EnrichConfig](path = None, resolveOrder)
}

private def buildIgluResources(resolverConfig: ResolverConfig): EitherT[IO, String, IgluResources] =
private def buildIgluResources(resolverConfig: ResolverConfig, maxJsonDepth: Int): EitherT[IO, String, IgluResources] =
for {
resolver <- Resolver.fromConfig[IO](resolverConfig).leftMap(_.show)
completeResolver = resolver.copy(repos = resolver.repos ++ readIgluExtraRegistry())
client <- EitherT.liftF(IgluCirceClient.fromResolver[IO](completeResolver, resolverConfig.cacheSize))
client <- EitherT.liftF(IgluCirceClient.fromResolver[IO](completeResolver, resolverConfig.cacheSize, maxJsonDepth))
} yield IgluResources(completeResolver, client)

private def loadEnrichmentsAsSDD(enrichmentsDirectory: Path,
Expand Down Expand Up @@ -226,8 +231,8 @@ object Configuration {

implicit val resolverDecoder: Decoder[ResolverConfig] = Decoder.decodeJson.emap(json => Resolver.parseConfig(json).leftMap(_.show))

implicit val adaptersSchemasDecoder: Decoder[AdaptersSchemas] =
deriveDecoder[AdaptersSchemas]
implicit val enrichConfigDecoder: Decoder[EnrichConfig] =
deriveDecoder[EnrichConfig]
implicit val enrichAdaptersSchemasDecoder: Decoder[EnrichAdaptersSchemas] =
deriveDecoder[EnrichAdaptersSchemas]
implicit val callrailSchemasDecoder: Decoder[CallrailSchemas] =
Expand Down Expand Up @@ -263,4 +268,18 @@ object Configuration {
implicit val veroSchemasDecoder: Decoder[VeroSchemas] =
deriveDecoder[VeroSchemas]

implicit val validationDecoder: Decoder[EnrichValidation] =
deriveDecoder[EnrichValidation]
implicit val atomicFieldsDecoder: Decoder[AtomicFields] = Decoder[Map[String, Int]].emap { fieldsLimits =>
val configuredFields = fieldsLimits.keys.toList
val supportedFields = AtomicFields.supportedFields.map(_.name)
val unsupportedFields = configuredFields.diff(supportedFields)

if (unsupportedFields.nonEmpty)
Left(s"""
|Configured atomic fields: ${unsupportedFields.mkString("[", ",", "]")} are not supported.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Presumably nobody will ever see this error message if field lengths are not configurable?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I hesitated to remove it. I thought that we might make this configurable one day in Micro and this will become useful. But I'm fine with removing it if you think that I should

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's fine to leave it, for the reasons you say.

|Supported fields: ${supportedFields.mkString("[", ",", "]")}""".stripMargin)
else
Right(AtomicFields.from(fieldsLimits))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,13 @@ import com.snowplowanalytics.snowplow.badrows.{BadRow, Failure, Payload, Process
import com.snowplowanalytics.snowplow.collector.core.Sink
import com.snowplowanalytics.snowplow.enrich.common.EtlPipeline
import com.snowplowanalytics.snowplow.enrich.common.adapters.{AdapterRegistry, RawEvent}
import com.snowplowanalytics.snowplow.enrich.common.enrichments.{AtomicFields, EnrichmentManager, EnrichmentRegistry}
import com.snowplowanalytics.snowplow.enrich.common.enrichments.{EnrichmentManager, EnrichmentRegistry}
import com.snowplowanalytics.snowplow.enrich.common.loaders.ThriftLoader
import com.snowplowanalytics.snowplow.enrich.common.utils.ConversionUtils
import io.circe.syntax._
import org.joda.time.DateTime
import org.slf4j.LoggerFactory
import com.snowplowanalytics.snowplow.micro.Configuration.EnrichConfig

/** Sink of the collector that Snowplow Micro is.
* Contains the functions that are called for each tracking event sent
Expand All @@ -40,10 +41,12 @@ final class MemorySink(igluClient: IgluCirceClient[IO],
enrichmentRegistry: EnrichmentRegistry[IO],
outputEnrichedTsv: Boolean,
processor: Processor,
adapterRegistry: AdapterRegistry[IO]) extends Sink[IO] {
enrichConfig: EnrichConfig) extends Sink[IO] {
override val maxBytes = Int.MaxValue
private lazy val logger = LoggerFactory.getLogger("EventLog")

private val adapterRegistry = new AdapterRegistry[IO](Map.empty, enrichConfig.adaptersSchemas)

override def isHealthy: IO[Boolean] = IO.pure(true)

override def storeRawEvents(events: List[Array[Byte]], key: String): IO[Unit] = {
Expand Down Expand Up @@ -75,7 +78,7 @@ final class MemorySink(igluClient: IgluCirceClient[IO],
case Validated.Valid(maybePayload) =>
maybePayload match {
case Some(collectorPayload) =>
adapterRegistry.toRawEvents(collectorPayload, igluClient, processor, registryLookup).flatMap {
adapterRegistry.toRawEvents(collectorPayload, igluClient, processor, registryLookup, enrichConfig.maxJsonDepth).flatMap {
case Validated.Valid(rawEvents) =>
val partitionEvents = rawEvents.toList.foldLeftM((Nil, Nil): (List[GoodEvent], List[BadEvent])) {
case ((good, bad), rawEvent) =>
Expand Down Expand Up @@ -135,8 +138,11 @@ final class MemorySink(igluClient: IgluCirceClient[IO],
EtlPipeline.FeatureFlags(acceptInvalid = false, legacyEnrichmentOrder = false),
IO.unit,
registryLookup,
AtomicFields.from(Map.empty)
enrichConfig.validation.atomicFieldsLimits,
emitIncomplete = false,
enrichConfig.maxJsonDepth
)
.toEither
.subflatMap { enriched =>
EventConverter.fromEnriched(enriched)
.leftMap { failure =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,8 @@ import com.snowplowanalytics.snowplow.micro.Configuration.{MicroConfig, SinkConf
import org.http4s.blaze.server.BlazeServerBuilder
import org.http4s.headers.`Strict-Transport-Security`
import org.http4s.server.Server
import org.http4s.server.middleware.{HSTS, Metrics, Timeout, Logger => LoggerMiddleware}
import org.http4s.server.middleware.{HSTS, Metrics, Timeout}
import org.http4s.{HttpApp, HttpRoutes}
import org.typelevel.ci.CIString
import org.typelevel.log4cats.Logger
import org.typelevel.log4cats.slf4j.Slf4jLogger

Expand Down Expand Up @@ -57,9 +56,7 @@ object MicroHttpServer {

private def builder(routes: HttpRoutes[IO], config: CollectorConfig[SinkConfig]): BlazeServerBuilder[IO] = {
BlazeServerBuilder[IO]
.withHttpApp(
loggerMiddleware(timeoutMiddleware(hstsMiddleware(config.hsts, routes.orNotFound), config.networking), config.debug.http)
)
.withHttpApp(timeoutMiddleware(hstsMiddleware(config.hsts, routes.orNotFound), config.networking))
.withIdleTimeout(config.networking.idleTimeout)
.withMaxConnections(config.networking.maxConnections)
.withResponseHeaderTimeout(config.networking.responseHeaderTimeout)
Expand All @@ -69,16 +66,6 @@ object MicroHttpServer {
)
}

private def loggerMiddleware(routes: HttpApp[IO], config: CollectorConfig.Debug.Http): HttpApp[IO] =
if (config.enable) {
LoggerMiddleware.httpApp[IO](
logHeaders = config.logHeaders,
logBody = config.logBody,
redactHeadersWhen = config.redactHeaders.map(CIString(_)).contains(_),
logAction = Some((msg: String) => Logger[IO].debug(msg))
)(routes)
} else routes

private def timeoutMiddleware(routes: HttpApp[IO], networking: CollectorConfig.Networking): HttpApp[IO] =
Timeout.httpApp[IO](timeout = networking.responseHeaderTimeout)(routes)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import com.snowplowanalytics.iglu.client.resolver.registries.JavaNetRegistryLook
import com.snowplowanalytics.snowplow.badrows.Processor
import com.snowplowanalytics.snowplow.collector.core._
import com.snowplowanalytics.snowplow.collector.core.model.Sinks
import com.snowplowanalytics.snowplow.enrich.common.adapters.AdapterRegistry
import com.snowplowanalytics.snowplow.enrich.common.enrichments.EnrichmentRegistry
import com.snowplowanalytics.snowplow.enrich.common.enrichments.registry.{Enrichment, EnrichmentConf}
import com.snowplowanalytics.snowplow.enrich.common.utils.{HttpClient, ShiftExecution}
Expand Down Expand Up @@ -56,9 +55,8 @@ object Run {
sslContext <- Resource.eval(setupSSLContext())
enrichmentRegistry <- buildEnrichmentRegistry(config.enrichmentsConfig)
badProcessor = Processor(BuildInfo.name, BuildInfo.version)
adapterRegistry = new AdapterRegistry[IO](Map.empty, config.adaptersSchemas.adaptersSchemas)
lookup = JavaNetRegistryLookup.ioLookupInstance[IO]
sink = new MemorySink(config.iglu.client, lookup, enrichmentRegistry, config.outputEnrichedTsv, badProcessor, adapterRegistry)
sink = new MemorySink(config.iglu.client, lookup, enrichmentRegistry, config.outputEnrichedTsv, badProcessor, config.enrichConfig)
collectorService = new Service[IO](
config.collector,
Sinks(sink, sink),
Expand All @@ -68,7 +66,6 @@ object Run {
config.collector.enableDefaultRedirect,
config.collector.rootResponse.enabled,
config.collector.crossDomain.enabled,
config.collector.networking.bodyReadTimeout,
collectorService
).value

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import com.snowplowanalytics.iglu.client.IgluCirceClient
import com.snowplowanalytics.iglu.client.resolver.Resolver
import com.snowplowanalytics.iglu.client.resolver.registries.{JavaNetRegistryLookup, Registry}
import com.snowplowanalytics.snowplow.badrows.Processor
import com.snowplowanalytics.snowplow.enrich.common.adapters.AdapterRegistry
import com.snowplowanalytics.snowplow.enrich.common.enrichments.EnrichmentRegistry
import org.specs2.mutable.SpecificationLike

Expand Down Expand Up @@ -162,12 +161,12 @@ class MemorySinkSpec extends CatsResource[IO, MemorySink] with SpecificationLike

private def createSink(): IO[MemorySink] = {
for {
igluClient <- IgluCirceClient.fromResolver[IO](Resolver(List(Registry.IgluCentral), None), 500)
enrichConfig <- Configuration.loadEnrichConfig().value.map(_.getOrElse(throw new IllegalArgumentException("Can't read defaults from Enrich config")))
igluClient <- IgluCirceClient.fromResolver[IO](Resolver[IO](List(Registry.IgluCentral), None), 500, enrichConfig.maxJsonDepth)
enrichmentRegistry = new EnrichmentRegistry[IO]()
processor = Processor(BuildInfo.name, BuildInfo.version)
adapterRegistry = new AdapterRegistry[IO](Map.empty, TestAdapterRegistry.adaptersSchemas)
lookup = JavaNetRegistryLookup.ioLookupInstance[IO]
} yield new MemorySink(igluClient, lookup, enrichmentRegistry, false, processor, adapterRegistry)
} yield new MemorySink(igluClient, lookup, enrichmentRegistry, false, processor, enrichConfig)
}

}
Loading