From 98c9861d713936620566430b4ed129d711eecde6 Mon Sep 17 00:00:00 2001 From: "sveyrie@luminatedata.com" Date: Thu, 14 Nov 2024 15:25:54 +0100 Subject: [PATCH] BigQuery JSON column: encode as Jackson JsonNode on write to prevent escape --- .../bigquery/types/ConverterProvider.scala | 32 ++++++++++--------- .../scio/bigquery/types/SchemaProvider.scala | 1 + .../spotify/scio/bigquery/types/package.scala | 11 +++++-- 3 files changed, 27 insertions(+), 17 deletions(-) diff --git a/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/types/ConverterProvider.scala b/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/types/ConverterProvider.scala index 5768d02c2e..a0ded09406 100644 --- a/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/types/ConverterProvider.scala +++ b/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/types/ConverterProvider.scala @@ -171,13 +171,14 @@ private[types] object ConverterProvider { val provider: OverrideTypeProvider = OverrideTypeProviderFinder.getProvider tpe match { - case t if provider.shouldOverrideType(c)(t) => q"$tree.toString" - case t if t =:= typeOf[Boolean] => tree - case t if t =:= typeOf[Int] => q"$tree.toLong" - case t if t =:= typeOf[Long] => tree - case t if t =:= typeOf[Float] => q"$tree.toDouble" - case t if t =:= typeOf[Double] => tree - case t if t =:= typeOf[String] => tree + case t if provider.shouldOverrideType(c)(t) => q"$tree.toString" + case t if t =:= typeOf[Boolean] => tree + case t if t =:= typeOf[Int] => q"$tree.toLong" + case t if t =:= typeOf[Long] => tree + case t if t =:= typeOf[Float] => q"$tree.toDouble" + case t if t =:= typeOf[Double] => tree + case t if t =:= typeOf[String] => tree + case t if t =:= typeOf[com.fasterxml.jackson.databind.JsonNode] => tree case t if t =:= typeOf[BigDecimal] => q"_root_.com.spotify.scio.bigquery.Numeric($tree).toString" @@ -198,7 +199,7 @@ private[types] object ConverterProvider { case t if t =:= typeOf[Geography] => q"$tree.wkt" case t if t =:= typeOf[Json] => - q"$tree.wkt" + q"$tree.asJackson" case t if t =:= typeOf[BigNumeric] => q"_root_.com.spotify.scio.bigquery.types.BigNumeric($tree.wkt).toString" @@ -276,12 +277,13 @@ private[types] object ConverterProvider { tpe match { case t if provider.shouldOverrideType(c)(t) => provider.createInstance(c)(t, q"$tree") - case t if t =:= typeOf[Boolean] => q"$s.toBoolean" - case t if t =:= typeOf[Int] => q"$s.toInt" - case t if t =:= typeOf[Long] => q"$s.toLong" - case t if t =:= typeOf[Float] => q"$s.toFloat" - case t if t =:= typeOf[Double] => q"$s.toDouble" - case t if t =:= typeOf[String] => q"$s" + case t if t =:= typeOf[Boolean] => q"$s.toBoolean" + case t if t =:= typeOf[Int] => q"$s.toInt" + case t if t =:= typeOf[Long] => q"$s.toLong" + case t if t =:= typeOf[Float] => q"$s.toFloat" + case t if t =:= typeOf[Double] => q"$s.toDouble" + case t if t =:= typeOf[String] => q"$s" + case t if t =:= typeOf[com.fasterxml.jackson.databind.JsonNode] => q"$s" case t if t =:= typeOf[BigDecimal] => q"_root_.com.spotify.scio.bigquery.Numeric($s)" @@ -412,7 +414,7 @@ private[types] object ConverterProvider { case t if t =:= typeOf[Geography] => q"$tree.wkt" case t if t =:= typeOf[Json] => - q"$tree.wkt" + q"$tree.asJackson" case t if t =:= typeOf[BigNumeric] => q"$tree.wkt" diff --git a/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/types/SchemaProvider.scala b/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/types/SchemaProvider.scala index b150b8a7c4..f037acbb08 100644 --- a/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/types/SchemaProvider.scala +++ b/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/types/SchemaProvider.scala @@ -90,6 +90,7 @@ private[types] object SchemaProvider { case t if t =:= typeOf[LocalDateTime] => ("DATETIME", Iterable.empty) case t if t =:= typeOf[Geography] => ("GEOGRAPHY", Iterable.empty) case t if t =:= typeOf[Json] => ("JSON", Iterable.empty) + case t if t =:= typeOf[com.fasterxml.jackson.databind.JsonNode] => ("JSON", Iterable.empty) case t if isCaseClass(t) => ("RECORD", toFields(t)) case _ => throw new RuntimeException(s"Unsupported type: $tpe") diff --git a/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/types/package.scala b/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/types/package.scala index 08deddd388..6b8542ad3f 100644 --- a/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/types/package.scala +++ b/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/types/package.scala @@ -53,14 +53,21 @@ package object types { case class Geography(wkt: String) /** - * Case class to serve as raw type for Json instances to distinguish them from Strings. + * Case class to serve as raw type for Json instances. On write, they will be transformed into + * Jackson JsonNode. * * See also https://cloud.google.com/bigquery/docs/reference/standard-sql/data-types#json_type * * @param wkt * Well Known Text formatted string that BigQuery displays for Json */ - case class Json(wkt: String) + case class Json(wkt: String) { + def asJackson: com.fasterxml.jackson.databind.JsonNode = { + val mapper = new com.fasterxml.jackson.databind.ObjectMapper() + mapper.readTree(wkt) + } + + } /** * Case class to serve as BigNumeric type to distinguish them from Numeric.