Skip to content

Commit

Permalink
BigQuery JSON column: encode as Jackson JsonNode on write to prevent …
Browse files Browse the repository at this point in the history
…escape
  • Loading branch information
turb committed Nov 14, 2024
1 parent c45685a commit 986438b
Show file tree
Hide file tree
Showing 5 changed files with 41 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -171,13 +171,14 @@ private[types] object ConverterProvider {
val provider: OverrideTypeProvider =
OverrideTypeProviderFinder.getProvider
tpe match {
case t if provider.shouldOverrideType(c)(t) => q"$tree.toString"
case t if t =:= typeOf[Boolean] => tree
case t if t =:= typeOf[Int] => q"$tree.toLong"
case t if t =:= typeOf[Long] => tree
case t if t =:= typeOf[Float] => q"$tree.toDouble"
case t if t =:= typeOf[Double] => tree
case t if t =:= typeOf[String] => tree
case t if provider.shouldOverrideType(c)(t) => q"$tree.toString"
case t if t =:= typeOf[Boolean] => tree
case t if t =:= typeOf[Int] => q"$tree.toLong"
case t if t =:= typeOf[Long] => tree
case t if t =:= typeOf[Float] => q"$tree.toDouble"
case t if t =:= typeOf[Double] => tree
case t if t =:= typeOf[String] => tree
case t if t =:= typeOf[com.fasterxml.jackson.databind.JsonNode] => tree

case t if t =:= typeOf[BigDecimal] =>
q"_root_.com.spotify.scio.bigquery.Numeric($tree).toString"
Expand All @@ -198,7 +199,7 @@ private[types] object ConverterProvider {
case t if t =:= typeOf[Geography] =>
q"$tree.wkt"
case t if t =:= typeOf[Json] =>
q"$tree.wkt"
q"$tree.asJackson"
case t if t =:= typeOf[BigNumeric] =>
q"_root_.com.spotify.scio.bigquery.types.BigNumeric($tree.wkt).toString"

Expand Down Expand Up @@ -276,12 +277,13 @@ private[types] object ConverterProvider {
tpe match {
case t if provider.shouldOverrideType(c)(t) =>
provider.createInstance(c)(t, q"$tree")
case t if t =:= typeOf[Boolean] => q"$s.toBoolean"
case t if t =:= typeOf[Int] => q"$s.toInt"
case t if t =:= typeOf[Long] => q"$s.toLong"
case t if t =:= typeOf[Float] => q"$s.toFloat"
case t if t =:= typeOf[Double] => q"$s.toDouble"
case t if t =:= typeOf[String] => q"$s"
case t if t =:= typeOf[Boolean] => q"$s.toBoolean"
case t if t =:= typeOf[Int] => q"$s.toInt"
case t if t =:= typeOf[Long] => q"$s.toLong"
case t if t =:= typeOf[Float] => q"$s.toFloat"
case t if t =:= typeOf[Double] => q"$s.toDouble"
case t if t =:= typeOf[String] => q"$s"
case t if t =:= typeOf[com.fasterxml.jackson.databind.JsonNode] => q"$s"
case t if t =:= typeOf[BigDecimal] =>
q"_root_.com.spotify.scio.bigquery.Numeric($s)"

Expand Down Expand Up @@ -412,7 +414,7 @@ private[types] object ConverterProvider {
case t if t =:= typeOf[Geography] =>
q"$tree.wkt"
case t if t =:= typeOf[Json] =>
q"$tree.wkt"
q"$tree.asJackson"
case t if t =:= typeOf[BigNumeric] =>
q"$tree.wkt"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ private[types] object SchemaProvider {
case t if t =:= typeOf[LocalDateTime] => ("DATETIME", Iterable.empty)
case t if t =:= typeOf[Geography] => ("GEOGRAPHY", Iterable.empty)
case t if t =:= typeOf[Json] => ("JSON", Iterable.empty)
case t if t =:= typeOf[com.fasterxml.jackson.databind.JsonNode] => ("JSON", Iterable.empty)

case t if isCaseClass(t) => ("RECORD", toFields(t))
case _ => throw new RuntimeException(s"Unsupported type: $tpe")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package com.spotify.scio.bigquery

import com.fasterxml.jackson.databind.{JsonNode, ObjectMapper}
import com.spotify.scio.coders.Coder
import org.apache.avro.Conversions.DecimalConversion
import org.apache.avro.LogicalTypes
Expand Down Expand Up @@ -53,14 +54,20 @@ package object types {
case class Geography(wkt: String)

/**
* Case class to serve as raw type for Json instances to distinguish them from Strings.
* Case class to serve as raw type for Json instances. On write, they will be transformed into
* Jackson JsonNode.
*
* See also https://cloud.google.com/bigquery/docs/reference/standard-sql/data-types#json_type
*
* @param wkt
* Well Known Text formatted string that BigQuery displays for Json
*/
case class Json(wkt: String)
case class Json(wkt: String) {
def asJackson: JsonNode = Json.mapper.readTree(wkt)
}
object Json {
private val mapper = new ObjectMapper()
}

/**
* Case class to serve as BigNumeric type to distinguish them from Numeric.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,12 @@ final class ConverterProviderSpec
.retryUntil(_.precision <= Numeric.MaxNumericPrecision)
.map(Numeric.apply)
}
implicit val arbJson: Arbitrary[Json] = Arbitrary(
for {
key <- Gen.alphaStr
value <- Gen.alphaStr
} yield Json("{\"" + key + "\":\"" + value + "\"}")
)
implicit val eqByteArrays: Eq[Array[Byte]] = Eq.instance[Array[Byte]](_.toList == _.toList)
implicit val eqByteString: Eq[ByteString] = Eq.instance[ByteString](_ == _)
implicit val eqInstant: Eq[Instant] = Eq.instance[Instant](_ == _)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package com.spotify.scio.bigquery.types

import com.fasterxml.jackson.databind.node.{JsonNodeFactory, ObjectNode}
import com.spotify.scio.bigquery._
import org.scalatest.matchers.should.Matchers
import org.scalatest.flatspec.AnyFlatSpec
Expand Down Expand Up @@ -48,8 +49,14 @@ class ConverterProviderTest extends AnyFlatSpec with Matchers {

it should "handle required json type" in {
val wkt = "{\"name\": \"Alice\", \"age\": 30}"
val jsNodeFactory = new JsonNodeFactory(false)
val jackson = jsNodeFactory
.objectNode()
.set[ObjectNode]("name", jsNodeFactory.textNode("Alice"))
.set[ObjectNode]("age", jsNodeFactory.numberNode(30))

RequiredJson.fromTableRow(TableRow("a" -> wkt)) shouldBe RequiredJson(Json(wkt))
BigQueryType.toTableRow[RequiredJson](RequiredJson(Json(wkt))) shouldBe TableRow("a" -> wkt)
BigQueryType.toTableRow[RequiredJson](RequiredJson(Json(wkt))) shouldBe TableRow("a" -> jackson)
}

it should "handle case classes with methods" in {
Expand Down

0 comments on commit 986438b

Please sign in to comment.