Skip to content

Commit

Permalink
BigQuery JSON column: encode as Jackson JsonNode on write to prevent …
Browse files Browse the repository at this point in the history
…escape
  • Loading branch information
turb committed Nov 14, 2024
1 parent c45685a commit 839a9d1
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -171,13 +171,14 @@ private[types] object ConverterProvider {
val provider: OverrideTypeProvider =
OverrideTypeProviderFinder.getProvider
tpe match {
case t if provider.shouldOverrideType(c)(t) => q"$tree.toString"
case t if t =:= typeOf[Boolean] => tree
case t if t =:= typeOf[Int] => q"$tree.toLong"
case t if t =:= typeOf[Long] => tree
case t if t =:= typeOf[Float] => q"$tree.toDouble"
case t if t =:= typeOf[Double] => tree
case t if t =:= typeOf[String] => tree
case t if provider.shouldOverrideType(c)(t) => q"$tree.toString"
case t if t =:= typeOf[Boolean] => tree
case t if t =:= typeOf[Int] => q"$tree.toLong"
case t if t =:= typeOf[Long] => tree
case t if t =:= typeOf[Float] => q"$tree.toDouble"
case t if t =:= typeOf[Double] => tree
case t if t =:= typeOf[String] => tree
case t if t =:= typeOf[com.fasterxml.jackson.databind.JsonNode] => tree

case t if t =:= typeOf[BigDecimal] =>
q"_root_.com.spotify.scio.bigquery.Numeric($tree).toString"
Expand All @@ -198,7 +199,7 @@ private[types] object ConverterProvider {
case t if t =:= typeOf[Geography] =>
q"$tree.wkt"
case t if t =:= typeOf[Json] =>
q"$tree.wkt"
q"$tree.asJackson"
case t if t =:= typeOf[BigNumeric] =>
q"_root_.com.spotify.scio.bigquery.types.BigNumeric($tree.wkt).toString"

Expand Down Expand Up @@ -276,12 +277,13 @@ private[types] object ConverterProvider {
tpe match {
case t if provider.shouldOverrideType(c)(t) =>
provider.createInstance(c)(t, q"$tree")
case t if t =:= typeOf[Boolean] => q"$s.toBoolean"
case t if t =:= typeOf[Int] => q"$s.toInt"
case t if t =:= typeOf[Long] => q"$s.toLong"
case t if t =:= typeOf[Float] => q"$s.toFloat"
case t if t =:= typeOf[Double] => q"$s.toDouble"
case t if t =:= typeOf[String] => q"$s"
case t if t =:= typeOf[Boolean] => q"$s.toBoolean"
case t if t =:= typeOf[Int] => q"$s.toInt"
case t if t =:= typeOf[Long] => q"$s.toLong"
case t if t =:= typeOf[Float] => q"$s.toFloat"
case t if t =:= typeOf[Double] => q"$s.toDouble"
case t if t =:= typeOf[String] => q"$s"
case t if t =:= typeOf[com.fasterxml.jackson.databind.JsonNode] => q"$s"
case t if t =:= typeOf[BigDecimal] =>
q"_root_.com.spotify.scio.bigquery.Numeric($s)"

Expand Down Expand Up @@ -412,7 +414,7 @@ private[types] object ConverterProvider {
case t if t =:= typeOf[Geography] =>
q"$tree.wkt"
case t if t =:= typeOf[Json] =>
q"$tree.wkt"
q"$tree.asJackson"
case t if t =:= typeOf[BigNumeric] =>
q"$tree.wkt"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ private[types] object SchemaProvider {
case t if t =:= typeOf[LocalDateTime] => ("DATETIME", Iterable.empty)
case t if t =:= typeOf[Geography] => ("GEOGRAPHY", Iterable.empty)
case t if t =:= typeOf[Json] => ("JSON", Iterable.empty)
case t if t =:= typeOf[com.fasterxml.jackson.databind.JsonNode] => ("JSON", Iterable.empty)

case t if isCaseClass(t) => ("RECORD", toFields(t))
case _ => throw new RuntimeException(s"Unsupported type: $tpe")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package com.spotify.scio.bigquery

import com.fasterxml.jackson.databind.{JsonNode, ObjectMapper}
import com.spotify.scio.coders.Coder
import org.apache.avro.Conversions.DecimalConversion
import org.apache.avro.LogicalTypes
Expand Down Expand Up @@ -53,14 +54,20 @@ package object types {
case class Geography(wkt: String)

/**
* Case class to serve as raw type for Json instances to distinguish them from Strings.
* Case class to serve as raw type for Json instances. On write, they will be transformed into
* Jackson JsonNode.
*
* See also https://cloud.google.com/bigquery/docs/reference/standard-sql/data-types#json_type
*
* @param wkt
* Well Known Text formatted string that BigQuery displays for Json
*/
case class Json(wkt: String)
case class Json(wkt: String) {
def asJackson: JsonNode = Json.mapper.readTree(wkt)
}
object Json {
private val mapper = new ObjectMapper()
}

/**
* Case class to serve as BigNumeric type to distinguish them from Numeric.
Expand Down

0 comments on commit 839a9d1

Please sign in to comment.