Skip to content

Commit

Permalink
Merge branch 'develop' into csm1892
Browse files Browse the repository at this point in the history
  • Loading branch information
psainics authored Oct 25, 2023
2 parents 70c3513 + 8cca139 commit 5f24893
Show file tree
Hide file tree
Showing 16 changed files with 637 additions and 27 deletions.
7 changes: 7 additions & 0 deletions docs/BigQueryTable-batchsink.md
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,13 @@ Macro format is supported. example `key1:val1,key2:val2`
Keys and values can contain only lowercase letters, numeric characters, underscores, and dashes.
For more information about labels, see [Labeling Jobs](https://cloud.google.com/bigquery/docs/adding-labels#job-label).

**JSON String**: List of fields to be written to BigQuery as a JSON string.
The fields must be of type STRING. To target nested fields, use dot notation.
For example, 'name.first' will target the 'first' field in the 'name' record. (Macro Enabled)

Use a comma-separated list to specify multiple fields in macro format.
Example: "nestedObject.nestedArray.raw, nestedArray.raw".

**Operation**: Type of write operation to perform. This can be set to Insert, Update or Upsert.
* Insert - all records will be inserted in destination table.
* Update - records that match on Table Key will be updated in the table. Records that do not match
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import com.google.cloud.bigquery.BigQuery;
import com.google.cloud.bigquery.Dataset;
import com.google.cloud.bigquery.DatasetId;
import com.google.cloud.bigquery.Table;
import com.google.cloud.hadoop.io.bigquery.BigQueryConfiguration;
import com.google.cloud.hadoop.io.bigquery.output.BigQueryTableFieldSchema;
import com.google.cloud.kms.v1.CryptoKeyName;
Expand Down Expand Up @@ -47,7 +48,11 @@
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.UUID;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -148,14 +153,27 @@ public void transform(StructuredRecord input, Emitter<KeyValue<StructuredRecord,
*/
protected final void initOutput(BatchSinkContext context, BigQuery bigQuery, String outputName, String fqn,
String tableName, @Nullable Schema tableSchema, String bucket,
FailureCollector collector, @Nullable String marker) throws IOException {
FailureCollector collector, @Nullable String marker,
Table table) throws IOException {
LOG.debug("Init output for table '{}' with schema: {}", tableName, tableSchema);

List<BigQueryTableFieldSchema> fields = BigQuerySinkUtils.getBigQueryTableFields(bigQuery, tableName, tableSchema,
getConfig().isAllowSchemaRelaxation(), getConfig().getDatasetProject(),
getConfig().getDataset(), getConfig().isTruncateTableSet(), collector);

Configuration configuration = new Configuration(baseConfiguration);
if (table != null) {
com.google.cloud.bigquery.Schema bqSchema = table.getDefinition().getSchema();
if (bqSchema != null) {
String jsonStringFields = BigQuerySinkUtils.getJsonStringFieldsFromBQSchema(bqSchema);
configuration.set(BigQueryConstants.CONFIG_JSON_STRING_FIELDS, jsonStringFields);
BigQuerySinkUtils.setJsonStringFields(fields, jsonStringFields);
}
}

if (getConfig().getJsonStringFields() != null && !getConfig().getJsonStringFields().isEmpty()) {
BigQuerySinkUtils.setJsonStringFields(fields, getConfig().getJsonStringFields());
}

// Build GCS storage path for this bucket output.
String temporaryGcsPath = BigQuerySinkUtils.getTemporaryGcsPath(bucket, runUUID.toString(), tableName);
Expand Down Expand Up @@ -232,6 +250,7 @@ private Configuration getBaseConfiguration(@Nullable CryptoKeyName cmekKeyName)
config.isAllowSchemaRelaxation());
baseConfiguration.setStrings(BigQueryConfiguration.OUTPUT_TABLE_WRITE_DISPOSITION.getKey(),
config.getWriteDisposition().name());
baseConfiguration.setStrings(BigQueryConstants.CONFIG_JSON_STRING_FIELDS, config.getJsonStringFields());
// this setting is needed because gcs has default chunk size of 64MB. This is large default chunk size which can
// cause OOM issue if there are many tables being written. See this - CDAP-16670
String gcsChunkSize = "8388608";
Expand Down Expand Up @@ -313,4 +332,82 @@ protected Configuration getOutputConfiguration() throws IOException {
return configuration;
}

/**
* Validates that the fields to be converted to JSON strings are present in the Output Schema.
* @param schema Output Schema.
* @param jsonStringFields List of fields to be converted to JSON strings comma separated.
* @param collector FailureCollector to collect errors.
*/
public void validateJsonStringFields(Schema schema,
String jsonStringFields, FailureCollector collector) {
Set<String> jsonFields = new HashSet<>(Arrays.asList(jsonStringFields.split(",")));
Set<String> jsonFieldsValidated = new HashSet<>();
validateJsonStringFields(schema, jsonFields, new ArrayList<>(), collector, jsonFieldsValidated);
jsonFields.removeAll(jsonFieldsValidated);
if (!jsonFields.isEmpty()) {
collector.addFailure(String.format("Field(s) '%s' are not present in the Output Schema.",
String.join(", ", jsonFields)),
"Remove the field(s) from the list of fields to be converted to JSON strings.")
.withConfigProperty(AbstractBigQuerySinkConfig.NAME_JSON_STRING_FIELDS);
}
}

private void validateJsonStringFields(Schema schema, Set<String> jsonFields, ArrayList<String> path,
FailureCollector collector, Set<String> jsonFieldsValidated) {
String fieldPath = String.join(".", path);
String actionMessage = "Only type 'STRING' is supported.";

Schema.LogicalType logicalType = schema.isNullable() ? schema.getNonNullable().getLogicalType() :
schema.getLogicalType();
if (logicalType != null && jsonFields.contains(fieldPath)) {
collector.addFailure(
String.format("Field '%s' is of type '%s' which is not supported for conversion to JSON string.",
fieldPath, logicalType),
actionMessage).withConfigProperty(AbstractBigQuerySinkConfig.NAME_JSON_STRING_FIELDS);
return;
}
Schema.Type type = getEffectiveType(schema);
List<Schema.Field> fields = getEffectiveFields(schema);
String errorMessage = String.format(
"Field '%s' is of type '%s' which is not supported for conversion to JSON string.", fieldPath, type);

if (type == Schema.Type.RECORD && fields != null) {
if (jsonFields.contains(fieldPath)) {
collector.addFailure(errorMessage, actionMessage)
.withConfigProperty(AbstractBigQuerySinkConfig.NAME_JSON_STRING_FIELDS);
}
for (Schema.Field field : fields) {
path.add(field.getName());
validateJsonStringFields(field.getSchema(), jsonFields, path, collector, jsonFieldsValidated);
path.remove(path.size() - 1);
}
} else {
jsonFieldsValidated.add(fieldPath);
if (type != Schema.Type.STRING && jsonFields.contains(fieldPath)) {
collector.addFailure(errorMessage, actionMessage)
.withConfigProperty(AbstractBigQuerySinkConfig.NAME_JSON_STRING_FIELDS);
}
}
}

private static Schema.Type getEffectiveType(Schema schema) {
Schema nonNullableSchema = schema.isNullable() ? schema.getNonNullable() : schema;
if (nonNullableSchema.getType() == Schema.Type.ARRAY && nonNullableSchema.getComponentSchema() != null) {
return nonNullableSchema.getComponentSchema().isNullable() ?
nonNullableSchema.getComponentSchema().getNonNullable().getType() :
nonNullableSchema.getComponentSchema().getType();
}
return nonNullableSchema.getType();
}

private static List<Schema.Field> getEffectiveFields(Schema schema) {
Schema nonNullableSchema = schema.isNullable() ? schema.getNonNullable() : schema;
if (nonNullableSchema.getType() == Schema.Type.ARRAY && nonNullableSchema.getComponentSchema() != null) {
return nonNullableSchema.getComponentSchema().isNullable() ?
nonNullableSchema.getComponentSchema().getNonNullable().getFields() :
nonNullableSchema.getComponentSchema().getFields();
}
return nonNullableSchema.getFields();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ public abstract class AbstractBigQuerySinkConfig extends BigQueryBaseConfig {
public static final String NAME_JOB_LABEL_KEY_VALUE = "jobLabelKeyValue";
protected static final String NAME_UPDATE_SCHEMA = "allowSchemaRelaxation";
private static final String SCHEME = "gs://";
protected static final String NAME_JSON_STRING_FIELDS = "jsonStringFields";

@Name(Constants.Reference.REFERENCE_NAME)
@Nullable
Expand Down Expand Up @@ -93,6 +94,12 @@ public abstract class AbstractBigQuerySinkConfig extends BigQueryBaseConfig {
"are reserved keys and cannot be used as label keys.")
protected String jobLabelKeyValue;

@Name(NAME_JSON_STRING_FIELDS)
@Nullable
@Description("Fields in input schema that should be treated as JSON strings. " +
"The schema of these fields should be of type STRING.")
protected String jsonStringFields;

public AbstractBigQuerySinkConfig(BigQueryConnectorConfig connection, String dataset, String cmekKey, String bucket) {
super(connection, dataset, cmekKey, bucket);
}
Expand Down Expand Up @@ -127,6 +134,11 @@ public String getJobLabelKeyValue() {
return jobLabelKeyValue;
}

@Nullable
public String getJsonStringFields() {
return jsonStringFields;
}

public boolean isAllowSchemaRelaxation() {
return allowSchemaRelaxation == null ? false : allowSchemaRelaxation;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,21 @@

import java.io.IOException;
import java.util.Objects;
import java.util.Set;
import javax.annotation.Nullable;

/**
* BigQueryJsonConverter converts a {@link StructuredRecord} to {@link JsonObject}
*/
public class BigQueryJsonConverter extends RecordConverter<StructuredRecord, JsonObject> {
private Set<String> jsonStringFieldsPaths;

public BigQueryJsonConverter() {
}

public BigQueryJsonConverter(Set<String> jsonStringFieldsPaths) {
this.jsonStringFieldsPaths = jsonStringFieldsPaths;
}

@Override
public JsonObject transform(StructuredRecord input, @Nullable Schema schema) throws IOException {
Expand All @@ -40,7 +49,7 @@ public JsonObject transform(StructuredRecord input, @Nullable Schema schema) thr
continue;
}
BigQueryRecordToJson.write(writer, recordField.getName(), input.get(recordField.getName()),
recordField.getSchema());
recordField.getSchema(), jsonStringFieldsPaths);
}
writer.endObject();
return writer.get().getAsJsonObject();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import io.cdap.cdap.api.data.schema.Schema;
import io.cdap.cdap.etl.api.FailureCollector;
import io.cdap.cdap.etl.api.PipelineConfigurer;
import io.cdap.cdap.etl.api.StageConfigurer;
import io.cdap.cdap.etl.api.batch.BatchSink;
import io.cdap.cdap.etl.api.batch.BatchSinkContext;
import io.cdap.cdap.etl.api.connector.Connector;
Expand Down Expand Up @@ -70,6 +71,15 @@ protected BigQueryMultiSinkConfig getConfig() {
public void configurePipeline(PipelineConfigurer pipelineConfigurer) {
config.validate(pipelineConfigurer.getStageConfigurer().getFailureCollector());
super.configurePipeline(pipelineConfigurer);

StageConfigurer configurer = pipelineConfigurer.getStageConfigurer();
FailureCollector collector = configurer.getFailureCollector();
Schema inputSchema = configurer.getInputSchema();
String jsonStringFields = config.getJsonStringFields();
if (jsonStringFields != null && inputSchema != null) {
validateJsonStringFields(inputSchema, jsonStringFields, collector);
}
collector.getOrThrowException();
}

@Override
Expand Down Expand Up @@ -134,7 +144,7 @@ protected void configureOutputSchemas(BatchSinkContext context,
outputName = sanitizeOutputName(outputName);
initOutput(context, bigQuery, outputName,
BigQueryUtil.getFQN(config.getDatasetProject(), config.getDataset(), tableName),
tableName, tableSchema, bucket, context.getFailureCollector(), tableName);
tableName, tableSchema, bucket, context.getFailureCollector(), tableName, table);
} catch (IOException e) {
collector.addFailure("Invalid schema: " + e.getMessage(), null);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,9 +87,11 @@
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -121,9 +123,12 @@ public RecordWriter<StructuredRecord, NullWritable> getRecordWriter(TaskAttemptC
io.cdap.cdap.api.data.schema.Schema schema)
throws IOException, InterruptedException {
Configuration configuration = taskAttemptContext.getConfiguration();
String jsonStringFields = configuration.get(BigQueryConstants.CONFIG_JSON_STRING_FIELDS, null);
Set<String> jsonFields = jsonStringFields == null ? Collections.emptySet() :
new HashSet<>(Arrays.asList(jsonStringFields.split(",")));
return new BigQueryRecordWriter(getDelegate(configuration).getRecordWriter(taskAttemptContext),
BigQueryOutputConfiguration.getFileFormat(configuration),
schema);
schema, jsonFields);
}

private io.cdap.cdap.api.data.schema.Schema getOutputSchema(Configuration configuration) throws IOException {
Expand Down
Loading

0 comments on commit 5f24893

Please sign in to comment.