Skip to content

Commit

Permalink
Merge pull request #1316 from cloudsufi/csm1365
Browse files Browse the repository at this point in the history
[PLUGIN-1692] BQ Sink Json Support
  • Loading branch information
vikasrathee-cs authored Oct 25, 2023
2 parents e6fe8d8 + 6f54654 commit 8cca139
Show file tree
Hide file tree
Showing 16 changed files with 641 additions and 27 deletions.
7 changes: 7 additions & 0 deletions docs/BigQueryTable-batchsink.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,13 @@ bucket will be created and then deleted after the run finishes.

**GCS Upload Request Chunk Size**: GCS upload request chunk size in bytes. Default value is 8388608 bytes.

**JSON String**: List of fields to be written to BigQuery as a JSON string.
The fields must be of type STRING. To target nested fields, use dot notation.
For example, 'name.first' will target the 'first' field in the 'name' record. (Macro Enabled)

Use a comma-separated list to specify multiple fields in macro format.
Example: "nestedObject.nestedArray.raw, nestedArray.raw".

**Operation**: Type of write operation to perform. This can be set to Insert, Update or Upsert.
* Insert - all records will be inserted in destination table.
* Update - records that match on Table Key will be updated in the table. Records that do not match
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import com.google.cloud.bigquery.BigQuery;
import com.google.cloud.bigquery.Dataset;
import com.google.cloud.bigquery.DatasetId;
import com.google.cloud.bigquery.Table;
import com.google.cloud.hadoop.io.bigquery.BigQueryConfiguration;
import com.google.cloud.hadoop.io.bigquery.output.BigQueryTableFieldSchema;
import com.google.cloud.kms.v1.CryptoKeyName;
Expand Down Expand Up @@ -47,7 +48,11 @@
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.UUID;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -145,14 +150,27 @@ public void transform(StructuredRecord input, Emitter<KeyValue<StructuredRecord,
*/
protected final void initOutput(BatchSinkContext context, BigQuery bigQuery, String outputName, String fqn,
String tableName, @Nullable Schema tableSchema, String bucket,
FailureCollector collector, @Nullable String marker) throws IOException {
FailureCollector collector, @Nullable String marker,
Table table) throws IOException {
LOG.debug("Init output for table '{}' with schema: {}", tableName, tableSchema);

List<BigQueryTableFieldSchema> fields = BigQuerySinkUtils.getBigQueryTableFields(bigQuery, tableName, tableSchema,
getConfig().isAllowSchemaRelaxation(), getConfig().getDatasetProject(),
getConfig().getDataset(), getConfig().isTruncateTableSet(), collector);

Configuration configuration = new Configuration(baseConfiguration);
if (table != null) {
com.google.cloud.bigquery.Schema bqSchema = table.getDefinition().getSchema();
if (bqSchema != null) {
String jsonStringFields = BigQuerySinkUtils.getJsonStringFieldsFromBQSchema(bqSchema);
configuration.set(BigQueryConstants.CONFIG_JSON_STRING_FIELDS, jsonStringFields);
BigQuerySinkUtils.setJsonStringFields(fields, jsonStringFields);
}
}

if (getConfig().getJsonStringFields() != null && !getConfig().getJsonStringFields().isEmpty()) {
BigQuerySinkUtils.setJsonStringFields(fields, getConfig().getJsonStringFields());
}

// Build GCS storage path for this bucket output.
String temporaryGcsPath = BigQuerySinkUtils.getTemporaryGcsPath(bucket, runUUID.toString(), tableName);
Expand Down Expand Up @@ -229,6 +247,7 @@ private Configuration getBaseConfiguration(@Nullable CryptoKeyName cmekKeyName)
config.isAllowSchemaRelaxation());
baseConfiguration.setStrings(BigQueryConfiguration.OUTPUT_TABLE_WRITE_DISPOSITION.getKey(),
config.getWriteDisposition().name());
baseConfiguration.setStrings(BigQueryConstants.CONFIG_JSON_STRING_FIELDS, config.getJsonStringFields());
// this setting is needed because gcs has default chunk size of 64MB. This is large default chunk size which can
// cause OOM issue if there are many tables being written. See this - CDAP-16670
String gcsChunkSize = "8388608";
Expand Down Expand Up @@ -310,4 +329,82 @@ protected Configuration getOutputConfiguration() throws IOException {
return configuration;
}

/**
* Validates that the fields to be converted to JSON strings are present in the Output Schema.
* @param schema Output Schema.
* @param jsonStringFields List of fields to be converted to JSON strings comma separated.
* @param collector FailureCollector to collect errors.
*/
public void validateJsonStringFields(Schema schema,
String jsonStringFields, FailureCollector collector) {
Set<String> jsonFields = new HashSet<>(Arrays.asList(jsonStringFields.split(",")));
Set<String> jsonFieldsValidated = new HashSet<>();
validateJsonStringFields(schema, jsonFields, new ArrayList<>(), collector, jsonFieldsValidated);
jsonFields.removeAll(jsonFieldsValidated);
if (!jsonFields.isEmpty()) {
collector.addFailure(String.format("Field(s) '%s' are not present in the Output Schema.",
String.join(", ", jsonFields)),
"Remove the field(s) from the list of fields to be converted to JSON strings.")
.withConfigProperty(AbstractBigQuerySinkConfig.NAME_JSON_STRING_FIELDS);
}
}

private void validateJsonStringFields(Schema schema, Set<String> jsonFields, ArrayList<String> path,
FailureCollector collector, Set<String> jsonFieldsValidated) {
String fieldPath = String.join(".", path);
String actionMessage = "Only type 'STRING' is supported.";

Schema.LogicalType logicalType = schema.isNullable() ? schema.getNonNullable().getLogicalType() :
schema.getLogicalType();
if (logicalType != null && jsonFields.contains(fieldPath)) {
collector.addFailure(
String.format("Field '%s' is of type '%s' which is not supported for conversion to JSON string.",
fieldPath, logicalType),
actionMessage).withConfigProperty(AbstractBigQuerySinkConfig.NAME_JSON_STRING_FIELDS);
return;
}
Schema.Type type = getEffectiveType(schema);
List<Schema.Field> fields = getEffectiveFields(schema);
String errorMessage = String.format(
"Field '%s' is of type '%s' which is not supported for conversion to JSON string.", fieldPath, type);

if (type == Schema.Type.RECORD && fields != null) {
if (jsonFields.contains(fieldPath)) {
collector.addFailure(errorMessage, actionMessage)
.withConfigProperty(AbstractBigQuerySinkConfig.NAME_JSON_STRING_FIELDS);
}
for (Schema.Field field : fields) {
path.add(field.getName());
validateJsonStringFields(field.getSchema(), jsonFields, path, collector, jsonFieldsValidated);
path.remove(path.size() - 1);
}
} else {
jsonFieldsValidated.add(fieldPath);
if (type != Schema.Type.STRING && jsonFields.contains(fieldPath)) {
collector.addFailure(errorMessage, actionMessage)
.withConfigProperty(AbstractBigQuerySinkConfig.NAME_JSON_STRING_FIELDS);
}
}
}

private static Schema.Type getEffectiveType(Schema schema) {
Schema nonNullableSchema = schema.isNullable() ? schema.getNonNullable() : schema;
if (nonNullableSchema.getType() == Schema.Type.ARRAY && nonNullableSchema.getComponentSchema() != null) {
return nonNullableSchema.getComponentSchema().isNullable() ?
nonNullableSchema.getComponentSchema().getNonNullable().getType() :
nonNullableSchema.getComponentSchema().getType();
}
return nonNullableSchema.getType();
}

private static List<Schema.Field> getEffectiveFields(Schema schema) {
Schema nonNullableSchema = schema.isNullable() ? schema.getNonNullable() : schema;
if (nonNullableSchema.getType() == Schema.Type.ARRAY && nonNullableSchema.getComponentSchema() != null) {
return nonNullableSchema.getComponentSchema().isNullable() ?
nonNullableSchema.getComponentSchema().getNonNullable().getFields() :
nonNullableSchema.getComponentSchema().getFields();
}
return nonNullableSchema.getFields();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ public abstract class AbstractBigQuerySinkConfig extends BigQueryBaseConfig {
private static final String NAME_GCS_CHUNK_SIZE = "gcsChunkSize";
protected static final String NAME_UPDATE_SCHEMA = "allowSchemaRelaxation";
private static final String SCHEME = "gs://";
protected static final String NAME_JSON_STRING_FIELDS = "jsonStringFields";

@Name(Constants.Reference.REFERENCE_NAME)
@Nullable
Expand Down Expand Up @@ -84,6 +85,12 @@ public abstract class AbstractBigQuerySinkConfig extends BigQueryBaseConfig {
"This value is ignored if the dataset or temporary bucket already exist.")
protected String location;

@Name(NAME_JSON_STRING_FIELDS)
@Nullable
@Description("Fields in input schema that should be treated as JSON strings. " +
"The schema of these fields should be of type STRING.")
protected String jsonStringFields;

public AbstractBigQuerySinkConfig(BigQueryConnectorConfig connection, String dataset, String cmekKey, String bucket) {
super(connection, dataset, cmekKey, bucket);
}
Expand Down Expand Up @@ -114,6 +121,11 @@ public String getGcsChunkSize() {
return gcsChunkSize;
}

@Nullable
public String getJsonStringFields() {
return jsonStringFields;
}

public boolean isAllowSchemaRelaxation() {
return allowSchemaRelaxation == null ? false : allowSchemaRelaxation;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,21 @@

import java.io.IOException;
import java.util.Objects;
import java.util.Set;
import javax.annotation.Nullable;

/**
* BigQueryJsonConverter converts a {@link StructuredRecord} to {@link JsonObject}
*/
public class BigQueryJsonConverter extends RecordConverter<StructuredRecord, JsonObject> {
private Set<String> jsonStringFieldsPaths;

public BigQueryJsonConverter() {
}

public BigQueryJsonConverter(Set<String> jsonStringFieldsPaths) {
this.jsonStringFieldsPaths = jsonStringFieldsPaths;
}

@Override
public JsonObject transform(StructuredRecord input, @Nullable Schema schema) throws IOException {
Expand All @@ -40,7 +49,7 @@ public JsonObject transform(StructuredRecord input, @Nullable Schema schema) thr
continue;
}
BigQueryRecordToJson.write(writer, recordField.getName(), input.get(recordField.getName()),
recordField.getSchema());
recordField.getSchema(), jsonStringFieldsPaths);
}
writer.endObject();
return writer.get().getAsJsonObject();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import io.cdap.cdap.api.data.schema.Schema;
import io.cdap.cdap.etl.api.FailureCollector;
import io.cdap.cdap.etl.api.PipelineConfigurer;
import io.cdap.cdap.etl.api.StageConfigurer;
import io.cdap.cdap.etl.api.batch.BatchSink;
import io.cdap.cdap.etl.api.batch.BatchSinkContext;
import io.cdap.cdap.etl.api.connector.Connector;
Expand Down Expand Up @@ -70,6 +71,15 @@ protected BigQueryMultiSinkConfig getConfig() {
public void configurePipeline(PipelineConfigurer pipelineConfigurer) {
config.validate(pipelineConfigurer.getStageConfigurer().getFailureCollector());
super.configurePipeline(pipelineConfigurer);

StageConfigurer configurer = pipelineConfigurer.getStageConfigurer();
FailureCollector collector = configurer.getFailureCollector();
Schema inputSchema = configurer.getInputSchema();
String jsonStringFields = config.getJsonStringFields();
if (jsonStringFields != null && inputSchema != null) {
validateJsonStringFields(inputSchema, jsonStringFields, collector);
}
collector.getOrThrowException();
}

@Override
Expand Down Expand Up @@ -134,7 +144,7 @@ protected void configureOutputSchemas(BatchSinkContext context,
outputName = sanitizeOutputName(outputName);
initOutput(context, bigQuery, outputName,
BigQueryUtil.getFQN(config.getDatasetProject(), config.getDataset(), tableName),
tableName, tableSchema, bucket, context.getFailureCollector(), tableName);
tableName, tableSchema, bucket, context.getFailureCollector(), tableName, table);
} catch (IOException e) {
collector.addFailure("Invalid schema: " + e.getMessage(), null);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,9 +87,11 @@
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -121,9 +123,12 @@ public RecordWriter<StructuredRecord, NullWritable> getRecordWriter(TaskAttemptC
io.cdap.cdap.api.data.schema.Schema schema)
throws IOException, InterruptedException {
Configuration configuration = taskAttemptContext.getConfiguration();
String jsonStringFields = configuration.get(BigQueryConstants.CONFIG_JSON_STRING_FIELDS, null);
Set<String> jsonFields = jsonStringFields == null ? Collections.emptySet() :
new HashSet<>(Arrays.asList(jsonStringFields.split(",")));
return new BigQueryRecordWriter(getDelegate(configuration).getRecordWriter(taskAttemptContext),
BigQueryOutputConfiguration.getFileFormat(configuration),
schema);
schema, jsonFields);
}

private io.cdap.cdap.api.data.schema.Schema getOutputSchema(Configuration configuration) throws IOException {
Expand Down
Loading

0 comments on commit 8cca139

Please sign in to comment.