From 6f546543c293a64a40ecf098cd04f38729979e1c Mon Sep 17 00:00:00 2001 From: psainics <122770897+psainics@users.noreply.github.com> Date: Tue, 18 Jul 2023 10:25:43 +0530 Subject: [PATCH] JSON Support --- docs/BigQueryTable-batchsink.md | 7 + .../bigquery/sink/AbstractBigQuerySink.java | 99 +++++++- .../sink/AbstractBigQuerySinkConfig.java | 12 + .../bigquery/sink/BigQueryJsonConverter.java | 11 +- .../gcp/bigquery/sink/BigQueryMultiSink.java | 12 +- .../bigquery/sink/BigQueryOutputFormat.java | 7 +- .../bigquery/sink/BigQueryRecordToJson.java | 111 +++++++-- .../bigquery/sink/BigQueryRecordWriter.java | 8 +- .../gcp/bigquery/sink/BigQuerySink.java | 9 +- .../gcp/bigquery/sink/BigQuerySinkUtils.java | 55 +++++ .../gcp/bigquery/util/BigQueryConstants.java | 1 + .../gcp/bigquery/util/BigQueryUtil.java | 3 +- .../bigquery/BigQueryRecordToJsonTest.java | 232 +++++++++++++++++- .../sink/AbstractBigQuerySinkTest.java | 89 +++++++ widgets/BigQueryMultiTable-batchsink.json | 6 + widgets/BigQueryTable-batchsink.json | 6 + 16 files changed, 641 insertions(+), 27 deletions(-) create mode 100644 src/test/java/io/cdap/plugin/gcp/bigquery/sink/AbstractBigQuerySinkTest.java diff --git a/docs/BigQueryTable-batchsink.md b/docs/BigQueryTable-batchsink.md index 445c37efda..23a92712a4 100644 --- a/docs/BigQueryTable-batchsink.md +++ b/docs/BigQueryTable-batchsink.md @@ -52,6 +52,13 @@ bucket will be created and then deleted after the run finishes. **GCS Upload Request Chunk Size**: GCS upload request chunk size in bytes. Default value is 8388608 bytes. +**JSON String**: List of fields to be written to BigQuery as a JSON string. +The fields must be of type STRING. To target nested fields, use dot notation. +For example, 'name.first' will target the 'first' field in the 'name' record. (Macro Enabled) + +Use a comma-separated list to specify multiple fields in macro format. +Example: "nestedObject.nestedArray.raw, nestedArray.raw". + **Operation**: Type of write operation to perform. This can be set to Insert, Update or Upsert. * Insert - all records will be inserted in destination table. * Update - records that match on Table Key will be updated in the table. Records that do not match diff --git a/src/main/java/io/cdap/plugin/gcp/bigquery/sink/AbstractBigQuerySink.java b/src/main/java/io/cdap/plugin/gcp/bigquery/sink/AbstractBigQuerySink.java index 88b7eed861..2ef3f2d7eb 100644 --- a/src/main/java/io/cdap/plugin/gcp/bigquery/sink/AbstractBigQuerySink.java +++ b/src/main/java/io/cdap/plugin/gcp/bigquery/sink/AbstractBigQuerySink.java @@ -19,6 +19,7 @@ import com.google.cloud.bigquery.BigQuery; import com.google.cloud.bigquery.Dataset; import com.google.cloud.bigquery.DatasetId; +import com.google.cloud.bigquery.Table; import com.google.cloud.hadoop.io.bigquery.BigQueryConfiguration; import com.google.cloud.hadoop.io.bigquery.output.BigQueryTableFieldSchema; import com.google.cloud.kms.v1.CryptoKeyName; @@ -47,7 +48,11 @@ import org.slf4j.LoggerFactory; import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashSet; import java.util.List; +import java.util.Set; import java.util.UUID; import java.util.stream.Collectors; @@ -145,7 +150,8 @@ public void transform(StructuredRecord input, Emitter fields = BigQuerySinkUtils.getBigQueryTableFields(bigQuery, tableName, tableSchema, @@ -153,6 +159,18 @@ protected final void initOutput(BatchSinkContext context, BigQuery bigQuery, Str getConfig().getDataset(), getConfig().isTruncateTableSet(), collector); Configuration configuration = new Configuration(baseConfiguration); + if (table != null) { + com.google.cloud.bigquery.Schema bqSchema = table.getDefinition().getSchema(); + if (bqSchema != null) { + String jsonStringFields = BigQuerySinkUtils.getJsonStringFieldsFromBQSchema(bqSchema); + configuration.set(BigQueryConstants.CONFIG_JSON_STRING_FIELDS, jsonStringFields); + BigQuerySinkUtils.setJsonStringFields(fields, jsonStringFields); + } + } + + if (getConfig().getJsonStringFields() != null && !getConfig().getJsonStringFields().isEmpty()) { + BigQuerySinkUtils.setJsonStringFields(fields, getConfig().getJsonStringFields()); + } // Build GCS storage path for this bucket output. String temporaryGcsPath = BigQuerySinkUtils.getTemporaryGcsPath(bucket, runUUID.toString(), tableName); @@ -229,6 +247,7 @@ private Configuration getBaseConfiguration(@Nullable CryptoKeyName cmekKeyName) config.isAllowSchemaRelaxation()); baseConfiguration.setStrings(BigQueryConfiguration.OUTPUT_TABLE_WRITE_DISPOSITION.getKey(), config.getWriteDisposition().name()); + baseConfiguration.setStrings(BigQueryConstants.CONFIG_JSON_STRING_FIELDS, config.getJsonStringFields()); // this setting is needed because gcs has default chunk size of 64MB. This is large default chunk size which can // cause OOM issue if there are many tables being written. See this - CDAP-16670 String gcsChunkSize = "8388608"; @@ -310,4 +329,82 @@ protected Configuration getOutputConfiguration() throws IOException { return configuration; } + /** + * Validates that the fields to be converted to JSON strings are present in the Output Schema. + * @param schema Output Schema. + * @param jsonStringFields List of fields to be converted to JSON strings comma separated. + * @param collector FailureCollector to collect errors. + */ + public void validateJsonStringFields(Schema schema, + String jsonStringFields, FailureCollector collector) { + Set jsonFields = new HashSet<>(Arrays.asList(jsonStringFields.split(","))); + Set jsonFieldsValidated = new HashSet<>(); + validateJsonStringFields(schema, jsonFields, new ArrayList<>(), collector, jsonFieldsValidated); + jsonFields.removeAll(jsonFieldsValidated); + if (!jsonFields.isEmpty()) { + collector.addFailure(String.format("Field(s) '%s' are not present in the Output Schema.", + String.join(", ", jsonFields)), + "Remove the field(s) from the list of fields to be converted to JSON strings.") + .withConfigProperty(AbstractBigQuerySinkConfig.NAME_JSON_STRING_FIELDS); + } + } + + private void validateJsonStringFields(Schema schema, Set jsonFields, ArrayList path, + FailureCollector collector, Set jsonFieldsValidated) { + String fieldPath = String.join(".", path); + String actionMessage = "Only type 'STRING' is supported."; + + Schema.LogicalType logicalType = schema.isNullable() ? schema.getNonNullable().getLogicalType() : + schema.getLogicalType(); + if (logicalType != null && jsonFields.contains(fieldPath)) { + collector.addFailure( + String.format("Field '%s' is of type '%s' which is not supported for conversion to JSON string.", + fieldPath, logicalType), + actionMessage).withConfigProperty(AbstractBigQuerySinkConfig.NAME_JSON_STRING_FIELDS); + return; + } + Schema.Type type = getEffectiveType(schema); + List fields = getEffectiveFields(schema); + String errorMessage = String.format( + "Field '%s' is of type '%s' which is not supported for conversion to JSON string.", fieldPath, type); + + if (type == Schema.Type.RECORD && fields != null) { + if (jsonFields.contains(fieldPath)) { + collector.addFailure(errorMessage, actionMessage) + .withConfigProperty(AbstractBigQuerySinkConfig.NAME_JSON_STRING_FIELDS); + } + for (Schema.Field field : fields) { + path.add(field.getName()); + validateJsonStringFields(field.getSchema(), jsonFields, path, collector, jsonFieldsValidated); + path.remove(path.size() - 1); + } + } else { + jsonFieldsValidated.add(fieldPath); + if (type != Schema.Type.STRING && jsonFields.contains(fieldPath)) { + collector.addFailure(errorMessage, actionMessage) + .withConfigProperty(AbstractBigQuerySinkConfig.NAME_JSON_STRING_FIELDS); + } + } + } + + private static Schema.Type getEffectiveType(Schema schema) { + Schema nonNullableSchema = schema.isNullable() ? schema.getNonNullable() : schema; + if (nonNullableSchema.getType() == Schema.Type.ARRAY && nonNullableSchema.getComponentSchema() != null) { + return nonNullableSchema.getComponentSchema().isNullable() ? + nonNullableSchema.getComponentSchema().getNonNullable().getType() : + nonNullableSchema.getComponentSchema().getType(); + } + return nonNullableSchema.getType(); + } + + private static List getEffectiveFields(Schema schema) { + Schema nonNullableSchema = schema.isNullable() ? schema.getNonNullable() : schema; + if (nonNullableSchema.getType() == Schema.Type.ARRAY && nonNullableSchema.getComponentSchema() != null) { + return nonNullableSchema.getComponentSchema().isNullable() ? + nonNullableSchema.getComponentSchema().getNonNullable().getFields() : + nonNullableSchema.getComponentSchema().getFields(); + } + return nonNullableSchema.getFields(); + } + } diff --git a/src/main/java/io/cdap/plugin/gcp/bigquery/sink/AbstractBigQuerySinkConfig.java b/src/main/java/io/cdap/plugin/gcp/bigquery/sink/AbstractBigQuerySinkConfig.java index b7a2ae234c..83ddc29cc0 100644 --- a/src/main/java/io/cdap/plugin/gcp/bigquery/sink/AbstractBigQuerySinkConfig.java +++ b/src/main/java/io/cdap/plugin/gcp/bigquery/sink/AbstractBigQuerySinkConfig.java @@ -51,6 +51,7 @@ public abstract class AbstractBigQuerySinkConfig extends BigQueryBaseConfig { private static final String NAME_GCS_CHUNK_SIZE = "gcsChunkSize"; protected static final String NAME_UPDATE_SCHEMA = "allowSchemaRelaxation"; private static final String SCHEME = "gs://"; + protected static final String NAME_JSON_STRING_FIELDS = "jsonStringFields"; @Name(Constants.Reference.REFERENCE_NAME) @Nullable @@ -84,6 +85,12 @@ public abstract class AbstractBigQuerySinkConfig extends BigQueryBaseConfig { "This value is ignored if the dataset or temporary bucket already exist.") protected String location; + @Name(NAME_JSON_STRING_FIELDS) + @Nullable + @Description("Fields in input schema that should be treated as JSON strings. " + + "The schema of these fields should be of type STRING.") + protected String jsonStringFields; + public AbstractBigQuerySinkConfig(BigQueryConnectorConfig connection, String dataset, String cmekKey, String bucket) { super(connection, dataset, cmekKey, bucket); } @@ -114,6 +121,11 @@ public String getGcsChunkSize() { return gcsChunkSize; } + @Nullable + public String getJsonStringFields() { + return jsonStringFields; + } + public boolean isAllowSchemaRelaxation() { return allowSchemaRelaxation == null ? false : allowSchemaRelaxation; } diff --git a/src/main/java/io/cdap/plugin/gcp/bigquery/sink/BigQueryJsonConverter.java b/src/main/java/io/cdap/plugin/gcp/bigquery/sink/BigQueryJsonConverter.java index a1066eb748..b1ed4dddac 100644 --- a/src/main/java/io/cdap/plugin/gcp/bigquery/sink/BigQueryJsonConverter.java +++ b/src/main/java/io/cdap/plugin/gcp/bigquery/sink/BigQueryJsonConverter.java @@ -23,12 +23,21 @@ import java.io.IOException; import java.util.Objects; +import java.util.Set; import javax.annotation.Nullable; /** * BigQueryJsonConverter converts a {@link StructuredRecord} to {@link JsonObject} */ public class BigQueryJsonConverter extends RecordConverter { + private Set jsonStringFieldsPaths; + + public BigQueryJsonConverter() { + } + + public BigQueryJsonConverter(Set jsonStringFieldsPaths) { + this.jsonStringFieldsPaths = jsonStringFieldsPaths; + } @Override public JsonObject transform(StructuredRecord input, @Nullable Schema schema) throws IOException { @@ -40,7 +49,7 @@ public JsonObject transform(StructuredRecord input, @Nullable Schema schema) thr continue; } BigQueryRecordToJson.write(writer, recordField.getName(), input.get(recordField.getName()), - recordField.getSchema()); + recordField.getSchema(), jsonStringFieldsPaths); } writer.endObject(); return writer.get().getAsJsonObject(); diff --git a/src/main/java/io/cdap/plugin/gcp/bigquery/sink/BigQueryMultiSink.java b/src/main/java/io/cdap/plugin/gcp/bigquery/sink/BigQueryMultiSink.java index efa1c3539b..9e242c1ea0 100644 --- a/src/main/java/io/cdap/plugin/gcp/bigquery/sink/BigQueryMultiSink.java +++ b/src/main/java/io/cdap/plugin/gcp/bigquery/sink/BigQueryMultiSink.java @@ -29,6 +29,7 @@ import io.cdap.cdap.api.data.schema.Schema; import io.cdap.cdap.etl.api.FailureCollector; import io.cdap.cdap.etl.api.PipelineConfigurer; +import io.cdap.cdap.etl.api.StageConfigurer; import io.cdap.cdap.etl.api.batch.BatchSink; import io.cdap.cdap.etl.api.batch.BatchSinkContext; import io.cdap.cdap.etl.api.connector.Connector; @@ -70,6 +71,15 @@ protected BigQueryMultiSinkConfig getConfig() { public void configurePipeline(PipelineConfigurer pipelineConfigurer) { config.validate(pipelineConfigurer.getStageConfigurer().getFailureCollector()); super.configurePipeline(pipelineConfigurer); + + StageConfigurer configurer = pipelineConfigurer.getStageConfigurer(); + FailureCollector collector = configurer.getFailureCollector(); + Schema inputSchema = configurer.getInputSchema(); + String jsonStringFields = config.getJsonStringFields(); + if (jsonStringFields != null && inputSchema != null) { + validateJsonStringFields(inputSchema, jsonStringFields, collector); + } + collector.getOrThrowException(); } @Override @@ -134,7 +144,7 @@ protected void configureOutputSchemas(BatchSinkContext context, outputName = sanitizeOutputName(outputName); initOutput(context, bigQuery, outputName, BigQueryUtil.getFQN(config.getDatasetProject(), config.getDataset(), tableName), - tableName, tableSchema, bucket, context.getFailureCollector(), tableName); + tableName, tableSchema, bucket, context.getFailureCollector(), tableName, table); } catch (IOException e) { collector.addFailure("Invalid schema: " + e.getMessage(), null); } diff --git a/src/main/java/io/cdap/plugin/gcp/bigquery/sink/BigQueryOutputFormat.java b/src/main/java/io/cdap/plugin/gcp/bigquery/sink/BigQueryOutputFormat.java index dd6cb05bcb..e348f417fe 100644 --- a/src/main/java/io/cdap/plugin/gcp/bigquery/sink/BigQueryOutputFormat.java +++ b/src/main/java/io/cdap/plugin/gcp/bigquery/sink/BigQueryOutputFormat.java @@ -87,9 +87,11 @@ import java.util.Arrays; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.Set; import java.util.UUID; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; @@ -121,9 +123,12 @@ public RecordWriter getRecordWriter(TaskAttemptC io.cdap.cdap.api.data.schema.Schema schema) throws IOException, InterruptedException { Configuration configuration = taskAttemptContext.getConfiguration(); + String jsonStringFields = configuration.get(BigQueryConstants.CONFIG_JSON_STRING_FIELDS, null); + Set jsonFields = jsonStringFields == null ? Collections.emptySet() : + new HashSet<>(Arrays.asList(jsonStringFields.split(","))); return new BigQueryRecordWriter(getDelegate(configuration).getRecordWriter(taskAttemptContext), BigQueryOutputConfiguration.getFileFormat(configuration), - schema); + schema, jsonFields); } private io.cdap.cdap.api.data.schema.Schema getOutputSchema(Configuration configuration) throws IOException { diff --git a/src/main/java/io/cdap/plugin/gcp/bigquery/sink/BigQueryRecordToJson.java b/src/main/java/io/cdap/plugin/gcp/bigquery/sink/BigQueryRecordToJson.java index f3882d0d0c..ec2c70edb7 100644 --- a/src/main/java/io/cdap/plugin/gcp/bigquery/sink/BigQueryRecordToJson.java +++ b/src/main/java/io/cdap/plugin/gcp/bigquery/sink/BigQueryRecordToJson.java @@ -16,6 +16,11 @@ package io.cdap.plugin.gcp.bigquery.sink; +import com.google.gson.Gson; +import com.google.gson.JsonArray; +import com.google.gson.JsonElement; +import com.google.gson.JsonObject; +import com.google.gson.JsonPrimitive; import com.google.gson.stream.JsonWriter; import io.cdap.cdap.api.common.Bytes; import io.cdap.cdap.api.data.format.StructuredRecord; @@ -34,11 +39,14 @@ import java.time.ZoneOffset; import java.time.ZonedDateTime; import java.time.format.DateTimeFormatter; +import java.util.ArrayList; import java.util.Arrays; import java.util.Base64; import java.util.Collection; import java.util.List; +import java.util.Map; import java.util.Objects; +import java.util.Set; import java.util.concurrent.TimeUnit; import javax.annotation.Nullable; @@ -56,8 +64,11 @@ public final class BigQueryRecordToJson { * @param object object to be written * @param fieldSchema field schema to be written */ - public static void write(JsonWriter writer, String name, Object object, Schema fieldSchema) throws IOException { - write(writer, name, false, object, fieldSchema); + public static void write(JsonWriter writer, String name, Object object, Schema fieldSchema, + Set jsonStringFieldsPaths) throws IOException { + List path = new ArrayList<>(); + path.add(name); + write(writer, name, false, object, fieldSchema, path, jsonStringFieldsPaths); } /** @@ -70,7 +81,8 @@ public static void write(JsonWriter writer, String name, Object object, Schema f * @param fieldSchema field schema to be written */ private static void write(JsonWriter writer, String name, boolean isArrayItem, Object object, - Schema fieldSchema) throws IOException { + Schema fieldSchema, List path, + Set jsonStringFieldsPaths) throws IOException { Schema schema = BigQueryUtil.getNonNullableSchema(fieldSchema); switch (schema.getType()) { case NULL: @@ -81,13 +93,13 @@ private static void write(JsonWriter writer, String name, boolean isArrayItem, O case BOOLEAN: case STRING: case BYTES: - writeSimpleTypes(writer, name, isArrayItem, object, schema); + writeSimpleTypes(writer, name, isArrayItem, object, schema, path, jsonStringFieldsPaths); break; case ARRAY: - writeArray(writer, name, object, schema); + writeArray(writer, name, object, schema, path, jsonStringFieldsPaths); break; case RECORD: - writeRecord(writer, name, object, schema); + writeRecord(writer, name, object, schema, path, jsonStringFieldsPaths); break; default: throw new IllegalStateException( @@ -105,7 +117,8 @@ private static void write(JsonWriter writer, String name, boolean isArrayItem, O * @param schema field schema to be written */ private static void writeSimpleTypes(JsonWriter writer, String name, boolean isArrayItem, Object object, - Schema schema) throws IOException { + Schema schema, List path, + Set jsonStringFieldsPaths) throws IOException { if (!isArrayItem) { writer.name(name); } @@ -115,6 +128,8 @@ private static void writeSimpleTypes(JsonWriter writer, String name, boolean isA return; } + String pathString = String.join(".", path); + Schema.LogicalType logicalType = schema.getLogicalType(); if (logicalType != null) { switch (logicalType) { @@ -166,6 +181,19 @@ private static void writeSimpleTypes(JsonWriter writer, String name, boolean isA writer.value((Boolean) object); break; case STRING: + if (jsonStringFieldsPaths.contains(pathString)) { + Gson gson = new Gson(); + String jsonString = object.toString(); + if (jsonString.startsWith("{") && jsonString.endsWith("}")) { + writeJsonObjectToWriter(gson.fromJson(jsonString, JsonObject.class), writer); + } else if (jsonString.startsWith("[") && jsonString.endsWith("]")) { + writeJsonArrayToWriter(gson.fromJson(jsonString, JsonArray.class), writer); + } else { + throw new IllegalStateException(String.format("Expected value of Field '%s' to be a valid JSON " + + "object or array.", name)); + } + break; + } writer.value(object.toString()); break; case BYTES: @@ -187,7 +215,8 @@ private static void writeSimpleTypes(JsonWriter writer, String name, boolean isA private static void writeArray(JsonWriter writer, String name, @Nullable Object value, - Schema fieldSchema) throws IOException { + Schema fieldSchema, + List path, Set jsonStringFieldsPaths) throws IOException { writer.name(name); writer.beginArray(); @@ -221,9 +250,14 @@ private static void writeArray(JsonWriter writer, } if (element instanceof StructuredRecord) { StructuredRecord record = (StructuredRecord) element; - processRecord(writer, record, Objects.requireNonNull(record.getSchema().getFields())); + path.add(name); + processRecord(writer, record, Objects.requireNonNull(record.getSchema().getFields()), + path, jsonStringFieldsPaths); + path.remove(path.size() - 1); } else { - write(writer, name, true, element, componentSchema); + path.add(name); + write(writer, name, true, element, componentSchema, path, jsonStringFieldsPaths); + path.remove(path.size() - 1); } } } @@ -233,7 +267,8 @@ private static void writeArray(JsonWriter writer, private static void writeRecord(JsonWriter writer, String name, @Nullable Object value, - Schema fieldSchema) throws IOException { + Schema fieldSchema, + List path, Set jsonStringFieldsPaths) throws IOException { if (value == null) { writer.name(name); writer.nullValue(); @@ -247,15 +282,20 @@ private static void writeRecord(JsonWriter writer, } writer.name(name); - processRecord(writer, (StructuredRecord) value, Objects.requireNonNull(fieldSchema.getFields())); + processRecord(writer, (StructuredRecord) value, Objects.requireNonNull(fieldSchema.getFields()), path + , jsonStringFieldsPaths); } private static void processRecord(JsonWriter writer, StructuredRecord record, - List fields) throws IOException { + List fields, + List path, Set jsonStringFieldsPaths) throws IOException { writer.beginObject(); for (Schema.Field field : fields) { - write(writer, field.getName(), record.get(field.getName()), field.getSchema()); + path.add(field.getName()); + write(writer, field.getName(), false, record.get(field.getName()), field.getSchema(), path, + jsonStringFieldsPaths); + path.remove(path.size() - 1); } writer.endObject(); } @@ -287,4 +327,47 @@ private static BigDecimal getDecimal(String name, byte[] value, Schema schema) { private BigQueryRecordToJson() { //no-op } + + private static void writeJsonObjectToWriter(JsonObject jsonObject, JsonWriter jsonWriter) throws IOException { + jsonWriter.beginObject(); + for (Map.Entry entry : jsonObject.entrySet()) { + String key = entry.getKey(); + JsonElement value = entry.getValue(); + + jsonWriter.name(key); + writeJsonElementToWriter(value, jsonWriter); // Recursively write the value + } + jsonWriter.endObject(); + } + + private static void writeJsonElementToWriter(JsonElement jsonElement, JsonWriter jsonWriter) throws IOException { + if (jsonElement.isJsonObject()) { + writeJsonObjectToWriter(jsonElement.getAsJsonObject(), jsonWriter); + } else if (jsonElement.isJsonArray()) { + writeJsonArrayToWriter(jsonElement.getAsJsonArray(), jsonWriter); + } else if (jsonElement.isJsonPrimitive()) { + writeJsonPrimitiveToWriter(jsonElement.getAsJsonPrimitive(), jsonWriter); + } else if (jsonElement.isJsonNull()) { + jsonWriter.nullValue(); + } + } + + private static void writeJsonArrayToWriter(JsonArray jsonArray, JsonWriter jsonWriter) throws IOException { + jsonWriter.beginArray(); + for (JsonElement element : jsonArray) { + writeJsonElementToWriter(element, jsonWriter); // Recursively write array elements + } + jsonWriter.endArray(); + } + + private static void writeJsonPrimitiveToWriter(JsonPrimitive jsonPrimitive, JsonWriter jsonWriter) + throws IOException { + if (jsonPrimitive.isNumber()) { + jsonWriter.value(jsonPrimitive.getAsNumber()); + } else if (jsonPrimitive.isBoolean()) { + jsonWriter.value(jsonPrimitive.getAsBoolean()); + } else if (jsonPrimitive.isString()) { + jsonWriter.value(jsonPrimitive.getAsString()); + } + } } diff --git a/src/main/java/io/cdap/plugin/gcp/bigquery/sink/BigQueryRecordWriter.java b/src/main/java/io/cdap/plugin/gcp/bigquery/sink/BigQueryRecordWriter.java index 512bb002b3..14def9f7b7 100644 --- a/src/main/java/io/cdap/plugin/gcp/bigquery/sink/BigQueryRecordWriter.java +++ b/src/main/java/io/cdap/plugin/gcp/bigquery/sink/BigQueryRecordWriter.java @@ -24,6 +24,7 @@ import org.apache.hadoop.mapreduce.TaskAttemptContext; import java.io.IOException; +import java.util.Set; import javax.annotation.Nullable; /** @@ -35,17 +36,20 @@ public class BigQueryRecordWriter extends RecordWriter jsonStringFieldsPaths; - public BigQueryRecordWriter(RecordWriter delegate, BigQueryFileFormat fileFormat, @Nullable Schema outputSchema) { + public BigQueryRecordWriter(RecordWriter delegate, BigQueryFileFormat fileFormat, @Nullable Schema outputSchema, + Set jsonStringFieldsPaths) { this.delegate = delegate; this.fileFormat = fileFormat; this.outputSchema = outputSchema; + this.jsonStringFieldsPaths = jsonStringFieldsPaths; initRecordConverter(); } private void initRecordConverter() { if (this.fileFormat == BigQueryFileFormat.NEWLINE_DELIMITED_JSON) { - recordConverter = new BigQueryJsonConverter(); + recordConverter = new BigQueryJsonConverter(jsonStringFieldsPaths); return; } recordConverter = new BigQueryAvroConverter(); diff --git a/src/main/java/io/cdap/plugin/gcp/bigquery/sink/BigQuerySink.java b/src/main/java/io/cdap/plugin/gcp/bigquery/sink/BigQuerySink.java index e8df636404..5445906e6d 100644 --- a/src/main/java/io/cdap/plugin/gcp/bigquery/sink/BigQuerySink.java +++ b/src/main/java/io/cdap/plugin/gcp/bigquery/sink/BigQuerySink.java @@ -109,6 +109,10 @@ public void configurePipeline(PipelineConfigurer pipelineConfigurer) { if (schema != null) { validateConfiguredSchema(schema, collector); } + + if (config.getJsonStringFields() != null && schema != null) { + validateJsonStringFields(schema, config.getJsonStringFields() , collector); + } } @Override @@ -132,9 +136,12 @@ protected void prepareRunInternal(BatchSinkContext context, BigQuery bigQuery, S configureTable(outputSchema); configureBigQuerySink(); + Table table = BigQueryUtil.getBigQueryTable(config.getDatasetProject(), config.getDataset(), config.getTable(), + config.getServiceAccount(), config.isServiceAccountFilePath(), + collector); initOutput(context, bigQuery, config.getReferenceName(), BigQueryUtil.getFQN(config.getDatasetProject(), config.getDataset(), config.getTable()), - config.getTable(), outputSchema, bucket, collector, null); + config.getTable(), outputSchema, bucket, collector, null, table); initSQLEngineOutput(context, bigQuery, config.getReferenceName(), context.getStageName(), config.getTable(), outputSchema, collector); } diff --git a/src/main/java/io/cdap/plugin/gcp/bigquery/sink/BigQuerySinkUtils.java b/src/main/java/io/cdap/plugin/gcp/bigquery/sink/BigQuerySinkUtils.java index fe53a04139..af6c244834 100644 --- a/src/main/java/io/cdap/plugin/gcp/bigquery/sink/BigQuerySinkUtils.java +++ b/src/main/java/io/cdap/plugin/gcp/bigquery/sink/BigQuerySinkUtils.java @@ -77,6 +77,7 @@ public final class BigQuerySinkUtils { private static final String TEMPORARY_BUCKET_FORMAT = GS_PATH_FORMAT + "/input/%s-%s"; private static final String DATETIME = "DATETIME"; private static final String RECORD = "RECORD"; + private static final String JSON = "JSON"; private static final Gson GSON = new Gson(); private static final Type LIST_OF_FIELD_TYPE = new TypeToken>() { }.getType(); @@ -279,6 +280,34 @@ public static String configureBucket(Configuration baseConfiguration, baseConfiguration.setBoolean("fs.gs.metadata.cache.enable", false); return bucket; } + /** + * Sets string fields to JSON type if they are present in the provided list of fields. + * @param fields list of BigQuery table fields. + * @param jsonStringFields Comma separated list of fields that should be set to JSON type. + * + */ + public static void setJsonStringFields(List fields, + String jsonStringFields) { + Set jsonFields = new HashSet<>(Arrays.asList(jsonStringFields.split(","))); + setJsonStringFields(fields, jsonFields, new ArrayList<>()); + } + + private static void setJsonStringFields(List fields, Set jsonFields, + List path) { + for (BigQueryTableFieldSchema field : fields) { + String fieldName = field.getName(); + String fieldType = field.getType(); + String separator = path.isEmpty() ? "" : "."; + String fieldPath = String.join(".", path) + separator + fieldName; + if (jsonFields.contains(fieldPath) && fieldType.equals(LegacySQLTypeName.STRING.name())) { + field.setType(LegacySQLTypeName.valueOf(JSON).name()); + } else if (field.getType().equals(LegacySQLTypeName.RECORD.name())) { + path.add(fieldName); + setJsonStringFields(field.getFields(), jsonFields, path); + path.remove(path.size() - 1); + } + } + } /** * Configures output for Sink @@ -299,6 +328,10 @@ public static void configureOutput(Configuration configuration, // Set up table schema BigQueryTableSchema outputTableSchema = new BigQueryTableSchema(); if (!fields.isEmpty()) { + String jsonStringFields = configuration.get(BigQueryConstants.CONFIG_JSON_STRING_FIELDS, null); + if (jsonStringFields != null) { + setJsonStringFields(fields, jsonStringFields); + } outputTableSchema.setFields(fields); } @@ -919,4 +952,26 @@ public static void recordLineage(BatchSinkContext context, } } + /** + * Get the list of fields that are of type JSON from the BigQuery schema. + * @param bqSchema BigQuery schema. + * @return comma separated list of fields that are of type JSON. + */ + public static String getJsonStringFieldsFromBQSchema(com.google.cloud.bigquery.Schema bqSchema) { + ArrayList fields = new ArrayList<>(); + getJsonStringFieldsFromBQSchema(bqSchema.getFields(), fields, new ArrayList<>()); + return String.join(",", fields); + } + private static void getJsonStringFieldsFromBQSchema(FieldList fieldList, + ArrayList fields, ArrayList path) { + for (Field field : fieldList) { + path.add(field.getName()); + if (field.getType() == LegacySQLTypeName.RECORD) { + getJsonStringFieldsFromBQSchema(field.getSubFields(), fields, path); + } else if (field.getType().equals(LegacySQLTypeName.valueOf(JSON))) { + fields.add(String.join(".", path)); + } + path.remove(path.size() - 1); + } + } } diff --git a/src/main/java/io/cdap/plugin/gcp/bigquery/util/BigQueryConstants.java b/src/main/java/io/cdap/plugin/gcp/bigquery/util/BigQueryConstants.java index 110b58e025..908e299681 100644 --- a/src/main/java/io/cdap/plugin/gcp/bigquery/util/BigQueryConstants.java +++ b/src/main/java/io/cdap/plugin/gcp/bigquery/util/BigQueryConstants.java @@ -34,6 +34,7 @@ public interface BigQueryConstants { String CONFIG_TABLE_KEY = "cdap.bq.sink.table.key"; String CONFIG_DEDUPE_BY = "cdap.bq.sink.dedupe.by"; String CONFIG_TABLE_FIELDS = "cdap.bq.sink.table.fields"; + String CONFIG_JSON_STRING_FIELDS = "cdap.bq.sink.json.string.fields"; String CONFIG_FILTER = "cdap.bq.source.filter"; String CONFIG_PARTITION_FILTER = "cdap.bq.sink.partition.filter"; String CONFIG_JOB_ID = "cdap.bq.sink.job.id"; diff --git a/src/main/java/io/cdap/plugin/gcp/bigquery/util/BigQueryUtil.java b/src/main/java/io/cdap/plugin/gcp/bigquery/util/BigQueryUtil.java index 831851a272..468f162dfa 100644 --- a/src/main/java/io/cdap/plugin/gcp/bigquery/util/BigQueryUtil.java +++ b/src/main/java/io/cdap/plugin/gcp/bigquery/util/BigQueryUtil.java @@ -113,7 +113,8 @@ public final class BigQueryUtil { Set>builder() .put(Schema.Type.INT, ImmutableSet.of(LegacySQLTypeName.INTEGER)) .put(Schema.Type.LONG, ImmutableSet.of(LegacySQLTypeName.INTEGER)) - .put(Schema.Type.STRING, ImmutableSet.of(LegacySQLTypeName.STRING, LegacySQLTypeName.DATETIME)) + .put(Schema.Type.STRING, ImmutableSet.of(LegacySQLTypeName.STRING, LegacySQLTypeName.DATETIME, + LegacySQLTypeName.valueOf("JSON"))) .put(Schema.Type.FLOAT, ImmutableSet.of(LegacySQLTypeName.FLOAT)) .put(Schema.Type.DOUBLE, ImmutableSet.of(LegacySQLTypeName.FLOAT)) .put(Schema.Type.BOOLEAN, ImmutableSet.of(LegacySQLTypeName.BOOLEAN)) diff --git a/src/test/java/io/cdap/plugin/gcp/bigquery/BigQueryRecordToJsonTest.java b/src/test/java/io/cdap/plugin/gcp/bigquery/BigQueryRecordToJsonTest.java index de3a17a90a..962a44fd28 100644 --- a/src/test/java/io/cdap/plugin/gcp/bigquery/BigQueryRecordToJsonTest.java +++ b/src/test/java/io/cdap/plugin/gcp/bigquery/BigQueryRecordToJsonTest.java @@ -17,6 +17,9 @@ package io.cdap.plugin.gcp.bigquery; import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; +import com.google.gson.Gson; +import com.google.gson.JsonArray; import com.google.gson.JsonElement; import com.google.gson.JsonObject; import com.google.gson.internal.bind.JsonTreeWriter; @@ -24,6 +27,7 @@ import io.cdap.cdap.api.data.schema.Schema; import io.cdap.plugin.gcp.bigquery.sink.BigQueryRecordToJson; import org.junit.Assert; +import org.junit.Before; import org.junit.Test; import java.io.IOException; @@ -35,12 +39,20 @@ import java.util.Iterator; import java.util.List; import java.util.Objects; +import java.util.Set; /** * Tests for {@link BigQueryRecordToJson}. */ public class BigQueryRecordToJsonTest { + Gson gson; + + @Before + public void setUp() { + gson = new Gson(); + } + @Test public void test() throws IOException { Schema schema = Schema.recordOf( @@ -63,13 +75,14 @@ public void test() throws IOException { .set("bytes2", ByteBuffer.wrap(bytes)) .setDateTime("datetime", localDateTime) .build(); + Set jsonStringFieldsPaths = ImmutableSet.of("raw"); try (JsonTreeWriter writer = new JsonTreeWriter()) { writer.beginObject(); for (Schema.Field recordField : Objects.requireNonNull(record.getSchema().getFields())) { if (schema.getField(recordField.getName()) != null) { BigQueryRecordToJson.write(writer, recordField.getName(), record.get(recordField.getName()), - recordField.getSchema()); + recordField.getSchema(), jsonStringFieldsPaths); } } writer.endObject(); @@ -102,12 +115,13 @@ public void testInvalidBytes() throws IOException { .set("bytes", "test") .build(); + Set jsonStringFieldsPaths = ImmutableSet.of("raw"); try (JsonTreeWriter writer = new JsonTreeWriter()) { writer.beginObject(); for (Schema.Field recordField : Objects.requireNonNull(record.getSchema().getFields())) { if (schema.getField(recordField.getName()) != null) { BigQueryRecordToJson.write(writer, recordField.getName(), record.get(recordField.getName()), - recordField.getSchema()); + recordField.getSchema(), jsonStringFieldsPaths); } } writer.endObject(); @@ -147,13 +161,13 @@ public void testArrayOfRecords() throws IOException { .build())) .set("name", "Tod") .build(); - + Set jsonStringFieldsPaths = ImmutableSet.of("raw"); try (JsonTreeWriter writer = new JsonTreeWriter()) { writer.beginObject(); for (Schema.Field recordField : Objects.requireNonNull(record.getSchema().getFields())) { if (schema.getField(recordField.getName()) != null) { BigQueryRecordToJson.write(writer, recordField.getName(), record.get(recordField.getName()), - recordField.getSchema()); + recordField.getSchema(), jsonStringFieldsPaths); } } writer.endObject(); @@ -214,7 +228,7 @@ public void testNullableRecord() throws IOException { for (Schema.Field recordField : Objects.requireNonNull(record.getSchema().getFields())) { if (schema.getField(recordField.getName()) != null) { BigQueryRecordToJson.write(writer, recordField.getName(), record.get(recordField.getName()), - recordField.getSchema()); + recordField.getSchema(), null); } } writer.endObject(); @@ -225,4 +239,212 @@ public void testNullableRecord() throws IOException { actual.get("base").getAsJsonObject().get("innerB").getAsJsonObject().get("int").getAsInt()); } } + + @Test + public void testNestedRecordWithJsonString() throws IOException { + Schema nestedRecordSchema = Schema.recordOf( + "nestedRecord", + Schema.Field.of("nestedString", Schema.of(Schema.Type.STRING)), + Schema.Field.of("nestedJsonString", Schema.of(Schema.Type.STRING)), + Schema.Field.of("nestedInt", Schema.of(Schema.Type.INT)) + ); + + Schema recordSchema = Schema.recordOf( + "record", + Schema.Field.of("string", Schema.of(Schema.Type.STRING)), + Schema.Field.of("jsonString", Schema.of(Schema.Type.STRING)), + Schema.Field.of("int", Schema.of(Schema.Type.INT)), + Schema.Field.of("nestedRecord", nestedRecordSchema) + ); + + String jsonString = "{\"string\":\"string\",\"int\":1}"; + String nestedJsonString = "{\"nestedString\":\"nestedString\",\"nestedInt\":1}"; + + StructuredRecord nestedRecord = StructuredRecord.builder(nestedRecordSchema) + .set("nestedString", "nestedString") + .set("nestedJsonString", nestedJsonString) + .set("nestedInt", 1) + .build(); + + StructuredRecord record = StructuredRecord.builder(recordSchema) + .set("string", "string") + .set("jsonString", jsonString) + .set("int", 1) + .set("nestedRecord", nestedRecord) + .build(); + + JsonObject jsonObject = gson.fromJson(jsonString, JsonObject.class); + JsonObject nestedJsonObject = gson.fromJson(nestedJsonString, JsonObject.class); + + Set jsonStringFieldsPaths = ImmutableSet.of("jsonString", "nestedRecord.nestedJsonString"); + try (JsonTreeWriter writer = new JsonTreeWriter()) { + writer.beginObject(); + for (Schema.Field recordField : Objects.requireNonNull(record.getSchema().getFields())) { + if (recordSchema.getField(recordField.getName()) != null) { + BigQueryRecordToJson.write(writer, recordField.getName(), record.get(recordField.getName()), + recordField.getSchema(), jsonStringFieldsPaths); + } + } + writer.endObject(); + + JsonObject actual = writer.get().getAsJsonObject(); + + Assert.assertEquals(jsonObject, actual.get("jsonString").getAsJsonObject()); + Assert.assertEquals(nestedJsonObject, actual.get("nestedRecord").getAsJsonObject().get("nestedJsonString") + .getAsJsonObject()); + } + } + + @Test + public void testJsonStringWithNestedObjects() throws IOException { + Schema recordSchema = Schema.recordOf( + "record", + Schema.Field.of("jsonString", Schema.of(Schema.Type.STRING)) + ); + + String jsonString = "{\n" + + " \"string\": \"string\",\n" + + " \"int\": 1,\n" + + " \"bool\": true,\n" + + " \"null\": null,\n" + + " \"nestedObject\": {\n" + + " \"nestedString\": \"nestedString\",\n" + + " \"nestedInt\": 1,\n" + + " \"nestedBool\": true,\n" + + " \"nestedNull\": null\n" + + " },\n" + + " \"array\": [\n" + + " \"string\",\n" + + " 1,\n" + + " true,\n" + + " null,\n" + + " {\n" + + " \"nestedString\": \"nestedString\",\n" + + " \"nestedInt\": 1,\n" + + " \"nestedBool\": true,\n" + + " \"nestedNull\": null\n" + + " }\n" + + " ],\n" + + " \"nestedArray\": [\n" + + " [\n" + + " \"string\",\n" + + " 1,\n" + + " true,\n" + + " null,\n" + + " {\n" + + " \"nestedString\": \"nestedString\",\n" + + " \"nestedInt\": 1,\n" + + " \"nestedBool\": true,\n" + + " \"nestedNull\": null\n" + + " }\n" + + " ]\n" + + " ],\n" + + " \"nestedObjectArray\": [\n" + + " {\n" + + " \"nestedString\": \"nestedString\",\n" + + " \"nestedInt\": 1,\n" + + " \"nestedBool\": true,\n" + + " \"nestedNull\": null\n" + + " }\n" + + " ]\n" + + "}"; + JsonObject jsonObject = gson.fromJson(jsonString, JsonObject.class); + + StructuredRecord record = StructuredRecord.builder(recordSchema).set("jsonString", jsonString).build(); + Set jsonStringFieldsPaths = ImmutableSet.of("jsonString"); + + try (JsonTreeWriter writer = new JsonTreeWriter()) { + writer.beginObject(); + for (Schema.Field recordField : Objects.requireNonNull(record.getSchema().getFields())) { + if (recordSchema.getField(recordField.getName()) != null) { + BigQueryRecordToJson.write(writer, recordField.getName(), record.get(recordField.getName()), + recordField.getSchema(), jsonStringFieldsPaths); + } + } + writer.endObject(); + + JsonObject actual = writer.get().getAsJsonObject(); + + Assert.assertEquals(jsonObject, actual.get("jsonString").getAsJsonObject()); + } + } + + @Test + public void testJsonStringWithEmptyObject() throws IOException { + Schema recordSchema = Schema.recordOf( + "record", + Schema.Field.of("jsonString", Schema.of(Schema.Type.STRING)) + ); + String jsonString = "{}"; + JsonObject jsonObject = gson.fromJson(jsonString, JsonObject.class); + StructuredRecord record = StructuredRecord.builder(recordSchema).set("jsonString", jsonString).build(); + Set jsonStringFieldsPaths = ImmutableSet.of("jsonString"); + try (JsonTreeWriter writer = new JsonTreeWriter()) { + + writer.beginObject(); + for (Schema.Field recordField : Objects.requireNonNull(record.getSchema().getFields())) { + if (recordSchema.getField(recordField.getName()) != null) { + BigQueryRecordToJson.write(writer, recordField.getName(), record.get(recordField.getName()), + recordField.getSchema(), jsonStringFieldsPaths); + } + } + writer.endObject(); + + JsonObject actual = writer.get().getAsJsonObject(); + + Assert.assertEquals(jsonObject, actual.get("jsonString").getAsJsonObject()); + } + } + + @Test + public void testJsonStringWithEmptyArray() throws IOException { + Schema recordSchema = Schema.recordOf( + "record", + Schema.Field.of("jsonString", Schema.of(Schema.Type.STRING)) + ); + String jsonString = "[]"; + JsonArray jsonObject = gson.fromJson(jsonString, JsonArray.class); + StructuredRecord record = StructuredRecord.builder(recordSchema).set("jsonString", jsonString).build(); + Set jsonStringFieldsPaths = ImmutableSet.of("jsonString"); + try (JsonTreeWriter writer = new JsonTreeWriter()) { + writer.beginObject(); + for (Schema.Field recordField : Objects.requireNonNull(record.getSchema().getFields())) { + if (recordSchema.getField(recordField.getName()) != null) { + BigQueryRecordToJson.write(writer, recordField.getName(), record.get(recordField.getName()), + recordField.getSchema(), jsonStringFieldsPaths); + } + } + writer.endObject(); + + JsonObject actual = writer.get().getAsJsonObject(); + + Assert.assertEquals(jsonObject, actual.get("jsonString").getAsJsonArray()); + } + } + + + /** + * Empty JSON string is not a valid JSON string and should throw an exception. + * @throws IOException + */ + @Test(expected = IllegalStateException.class) + public void testEmptyJsonString() throws IOException { + Schema recordSchema = Schema.recordOf( + "record", + Schema.Field.of("jsonString", Schema.of(Schema.Type.STRING)) + ); + String jsonString = ""; + StructuredRecord record = StructuredRecord.builder(recordSchema).set("jsonString", jsonString).build(); + Set jsonStringFieldsPaths = ImmutableSet.of("jsonString"); + JsonTreeWriter writer = new JsonTreeWriter(); + writer.beginObject(); + for (Schema.Field recordField : Objects.requireNonNull(record.getSchema().getFields())) { + if (recordSchema.getField(recordField.getName()) != null) { + BigQueryRecordToJson.write(writer, recordField.getName(), record.get(recordField.getName()), + recordField.getSchema(), jsonStringFieldsPaths); + } + } + writer.endObject(); + } + } diff --git a/src/test/java/io/cdap/plugin/gcp/bigquery/sink/AbstractBigQuerySinkTest.java b/src/test/java/io/cdap/plugin/gcp/bigquery/sink/AbstractBigQuerySinkTest.java new file mode 100644 index 0000000000..d756ee51d8 --- /dev/null +++ b/src/test/java/io/cdap/plugin/gcp/bigquery/sink/AbstractBigQuerySinkTest.java @@ -0,0 +1,89 @@ +/* + * Copyright © 2023 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.plugin.gcp.bigquery.sink; + +import io.cdap.cdap.api.data.schema.Schema; +import io.cdap.cdap.etl.mock.validation.MockFailureCollector; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +/** + * Tests for {@link AbstractBigQuerySink}. + */ +public class AbstractBigQuerySinkTest { + MockFailureCollector collector; + Schema notNestedSchema; + Schema nestedSchema; + Schema oneLevelNestedSchema; + + @Before + public void setup() throws NoSuchMethodException { + collector = new MockFailureCollector(); + notNestedSchema = Schema.recordOf("test", + Schema.Field.of("id", Schema.of(Schema.Type.INT)), + Schema.Field.of("name", Schema.of(Schema.Type.STRING)), + Schema.Field.of("age", Schema.of(Schema.Type.INT)), + Schema.Field.of("objectJson", Schema.of(Schema.Type.STRING))); + nestedSchema = Schema.recordOf("nestedObject", + Schema.Field.of("nestedId", Schema.of(Schema.Type.INT)), + Schema.Field.of("nestedName", Schema.of(Schema.Type.STRING)), + Schema.Field.of("nestedAge", Schema.of(Schema.Type.INT)), + Schema.Field.of("nestedObjectJson", Schema.of(Schema.Type.STRING))); + oneLevelNestedSchema = Schema.recordOf("test", + Schema.Field.of("id", Schema.of(Schema.Type.INT)), + Schema.Field.of("name", Schema.of(Schema.Type.STRING)), + Schema.Field.of("age", Schema.of(Schema.Type.INT)), + Schema.Field.of("nested", nestedSchema)); + } + + @Test + public void testValidateJsonStringFieldsNoNesting() { + String jsonFields = "objectJson"; + new BigQuerySink(null).validateJsonStringFields(notNestedSchema, jsonFields, collector); + Assert.assertEquals(0, collector.getValidationFailures().size()); + } + + @Test + public void testValidateJsonStringFieldsOneLevelNesting() { + String jsonFields = "nested.nestedObjectJson"; + new BigQuerySink(null).validateJsonStringFields(oneLevelNestedSchema, jsonFields, collector); + Assert.assertEquals(0, collector.getValidationFailures().size()); + + } + + @Test + public void testValidateJsonStringFieldsOneLevelNestingNotString() { + + String jsonFields = "nested.nestedId"; + new BigQuerySink(null).validateJsonStringFields(oneLevelNestedSchema, jsonFields, collector); + Assert.assertEquals(String.format( + "Field '%s' is of type '%s' which is not supported for conversion to JSON string.", + "nested.nestedId", Schema.Type.INT), + collector.getValidationFailures().stream().findFirst().get().getMessage()); + + } + + @Test + public void testValidateJsonStringFieldsDoesNotExist() { + String jsonFields = "nested.nestedObjectJson"; + new BigQuerySink(null).validateJsonStringFields(notNestedSchema, jsonFields, collector); + Assert.assertEquals(String.format("Field(s) '%s' are not present in the Output Schema.", "nested.nestedObjectJson"), + collector.getValidationFailures().stream().findFirst().get().getMessage()); + } + +} diff --git a/widgets/BigQueryMultiTable-batchsink.json b/widgets/BigQueryMultiTable-batchsink.json index b2b9eac124..f4bc14121c 100644 --- a/widgets/BigQueryMultiTable-batchsink.json +++ b/widgets/BigQueryMultiTable-batchsink.json @@ -130,6 +130,12 @@ { "label": "Advanced", "properties": [ + { + "name": "jsonStringFields", + "widget-type": "hidden", + "label": "JSON String", + "widget-attributes": {} + }, { "widget-type": "textbox", "label": "Temporary Bucket Name", diff --git a/widgets/BigQueryTable-batchsink.json b/widgets/BigQueryTable-batchsink.json index dbd56cc7a9..63a89886f7 100644 --- a/widgets/BigQueryTable-batchsink.json +++ b/widgets/BigQueryTable-batchsink.json @@ -138,6 +138,12 @@ { "label": "Advanced", "properties": [ + { + "name": "jsonStringFields", + "widget-type": "csv", + "label": "JSON String", + "widget-attributes": {} + }, { "widget-type": "radio-group", "name": "operation",