Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[PLUGIN-1705] Sink job label support #1319

Merged
merged 1 commit into from
Nov 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions docs/BigQueryMultiTable-batchsink.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,14 @@ write BigQuery data to this project.
Datasets are top-level containers that are used to organize and control access to tables and views.
If dataset does not exist, it will be created.

**BQ Job Labels:** Key value pairs to be added as labels to the BigQuery job. Keys must be unique. (Macro Enabled)

[job_source, type] are system defined labels used by CDAP for internal purpose and cannot be used as label keys.
Macro format is supported. example `key1:val1,key2:val2`

Keys and values can contain only lowercase letters, numeric characters, underscores, and dashes.
For more information about labels, see [Docs](https://cloud.google.com/bigquery/docs/labels-intro#requirements).

**Temporary Bucket Name:** Google Cloud Storage bucket to store temporary data in.
It will be automatically created if it does not exist. Temporary data will be deleted after it is loaded into BigQuery.
If the bucket was created automatically, it will be deleted after the run finishes.
Expand Down
8 changes: 8 additions & 0 deletions docs/BigQueryTable-batchsink.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,14 @@ bucket will be created and then deleted after the run finishes.

**GCS Upload Request Chunk Size**: GCS upload request chunk size in bytes. Default value is 8388608 bytes.

**BQ Job Labels:** Key value pairs to be added as labels to the BigQuery job. Keys must be unique. (Macro Enabled)

[job_source, type] are system defined labels used by CDAP for internal purpose and cannot be used as label keys.
Macro format is supported. example `key1:val1,key2:val2`

Keys and values can contain only lowercase letters, numeric characters, underscores, and dashes.
For more information about labels, see [Docs](https://cloud.google.com/bigquery/docs/labels-intro#requirements).

**JSON String**: List of fields to be written to BigQuery as a JSON string.
The fields must be of type STRING. To target nested fields, use dot notation.
For example, 'name.first' will target the 'first' field in the 'name' record. (Macro Enabled)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ public void run(ActionContext context) throws Exception {
}

// Add labels for the BigQuery Execute job.
builder.setLabels(BigQueryUtil.getJobTags(BigQueryUtil.BQ_JOB_TYPE_EXECUTE_TAG));
builder.setLabels(BigQueryUtil.getJobLabels(BigQueryUtil.BQ_JOB_TYPE_EXECUTE_TAG));

QueryJobConfiguration queryConfig = builder.build();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,12 +107,17 @@ public final void prepareRun(BatchSinkContext context) throws Exception {
bucketName = BigQuerySinkUtils.configureBucket(baseConfiguration, bucketName, runUUID.toString());
Bucket bucket = storage.get(bucketName);

// Set user defined job label key value pair
String jobLabelKeyValue = getConfig().getJobLabelKeyValue();
if (jobLabelKeyValue != null) {
baseConfiguration.set(BigQueryConstants.CONFIG_JOB_LABEL_KEY_VALUE, jobLabelKeyValue);
}

if (!context.isPreviewEnabled()) {
BigQuerySinkUtils.createResources(bigQuery, dataset, datasetId,
storage, bucket, bucketName,
config.getLocation(), cmekKeyName);
}

prepareRunInternal(context, bigQuery, bucketName);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import io.cdap.plugin.gcp.common.CmekUtils;

import java.util.Collections;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import javax.annotation.Nullable;
Expand All @@ -49,6 +50,7 @@ public abstract class AbstractBigQuerySinkConfig extends BigQueryBaseConfig {
public static final String NAME_TRUNCATE_TABLE = "truncateTable";
public static final String NAME_LOCATION = "location";
private static final String NAME_GCS_CHUNK_SIZE = "gcsChunkSize";
public static final String NAME_BQ_JOB_LABELS = "jobLabels";
protected static final String NAME_UPDATE_SCHEMA = "allowSchemaRelaxation";
private static final String SCHEME = "gs://";
protected static final String NAME_JSON_STRING_FIELDS = "jsonStringFields";
Expand Down Expand Up @@ -85,6 +87,13 @@ public abstract class AbstractBigQuerySinkConfig extends BigQueryBaseConfig {
"This value is ignored if the dataset or temporary bucket already exist.")
protected String location;

@Name(NAME_BQ_JOB_LABELS)
@Macro
@Nullable
@Description("Key value pairs to be added as labels to the BigQuery job. Keys must be unique. [job_source, type] " +
"are reserved keys and cannot be used as label keys.")
protected String jobLabelKeyValue;

@Name(NAME_JSON_STRING_FIELDS)
@Nullable
@Description("Fields in input schema that should be treated as JSON strings. " +
Expand Down Expand Up @@ -120,6 +129,10 @@ protected String getTable() {
public String getGcsChunkSize() {
return gcsChunkSize;
}
@Nullable
public String getJobLabelKeyValue() {
return jobLabelKeyValue;
}

@Nullable
public String getJsonStringFields() {
Expand All @@ -138,7 +151,6 @@ public JobInfo.WriteDisposition getWriteDisposition() {
public boolean isTruncateTableSet() {
return truncateTable != null && truncateTable;
}

public void validate(FailureCollector collector) {
validate(collector, Collections.emptyMap());
}
Expand All @@ -161,6 +173,9 @@ public void validate(FailureCollector collector, Map<String, String> arguments)
if (!containsMacro(NAME_CMEK_KEY)) {
validateCmekKey(collector, arguments);
}
if (!containsMacro(NAME_BQ_JOB_LABELS)) {
validateJobLabelKeyValue(collector);
}
}

void validateCmekKey(FailureCollector failureCollector, Map<String, String> arguments) {
Expand All @@ -172,6 +187,108 @@ void validateCmekKey(FailureCollector failureCollector, Map<String, String> argu
validateCmekKeyLocation(cmekKeyName, null, location, failureCollector);
}

/**
* Validates job label key value pairs, as per the following rules:
* Keys and values can contain only lowercase letters, numeric characters, underscores, and dashes.
* Defined in the following link:
* <a href="https://cloud.google.com/bigquery/docs/labels-intro#requirements">Docs</a>
* @param failureCollector failure collector
*/
void validateJobLabelKeyValue(FailureCollector failureCollector) {
itsankit-google marked this conversation as resolved.
Show resolved Hide resolved
Set<String> reservedKeys = BigQueryUtil.BQ_JOB_LABEL_SYSTEM_KEYS;
int maxLabels = 64 - reservedKeys.size();
int maxKeyLength = 63;
int maxValueLength = 63;

String validLabelKeyRegex = "^[\\p{L}][a-z0-9-_\\p{L}]+$";
String validLabelValueRegex = "^[a-z0-9-_\\p{L}]+$";
String capitalLetterRegex = ".*[A-Z].*";
String labelKeyValue = getJobLabelKeyValue();

if (Strings.isNullOrEmpty(labelKeyValue)) {
return;
}

String[] keyValuePairs = labelKeyValue.split(",");
Set<String> uniqueKeys = new HashSet<>();

for (String keyValuePair : keyValuePairs) {

// Adding a label without a value is valid behavior
// Read more here: https://cloud.google.com/bigquery/docs/adding-labels#adding_a_label_without_a_value
String[] keyValue = keyValuePair.trim().split(":");
boolean isKeyPresent = keyValue.length == 1 || keyValue.length == 2;
itsankit-google marked this conversation as resolved.
Show resolved Hide resolved
boolean isValuePresent = keyValue.length == 2;


if (!isKeyPresent) {
failureCollector.addFailure(String.format("Invalid job label key value pair '%s'.", keyValuePair),
"Job label key value pair should be in the format 'key:value'.")
.withConfigProperty(NAME_BQ_JOB_LABELS);
continue;
}

// Check if key is reserved
if (reservedKeys.contains(keyValue[0])) {
failureCollector.addFailure(String.format("Invalid job label key '%s'.", keyValue[0]),
"A system label already exists with same name.").withConfigProperty(NAME_BQ_JOB_LABELS);
continue;
}

String key = keyValue[0];
String value = isValuePresent ? keyValue[1] : "";
boolean isKeyValid = true;
boolean isValueValid = true;

// Key cannot be empty
if (Strings.isNullOrEmpty(key)) {
failureCollector.addFailure(String.format("Invalid job label key '%s'.", key),
"Job label key cannot be empty.").withConfigProperty(NAME_BQ_JOB_LABELS);
isKeyValid = false;
}

// Key cannot be longer than 63 characters
if (key.length() > maxKeyLength) {
failureCollector.addFailure(String.format("Invalid job label key '%s'.", key),
"Job label key cannot be longer than 63 characters.").withConfigProperty(NAME_BQ_JOB_LABELS);
isKeyValid = false;
}

// Value cannot be longer than 63 characters
if (value.length() > maxValueLength) {
failureCollector.addFailure(String.format("Invalid job label value '%s'.", value),
"Job label value cannot be longer than 63 characters.").withConfigProperty(NAME_BQ_JOB_LABELS);
isValueValid = false;
}

if (isKeyValid && (!key.matches(validLabelKeyRegex) || key.matches(capitalLetterRegex))) {
failureCollector.addFailure(String.format("Invalid job label key '%s'.", key),
"Job label key can only contain lowercase letters, numeric characters, " +
"underscores, and dashes. Check docs for more details.")
.withConfigProperty(NAME_BQ_JOB_LABELS);
isKeyValid = false;
}

if (isValuePresent && isValueValid &&
(!value.matches(validLabelValueRegex) || value.matches(capitalLetterRegex))) {
failureCollector.addFailure(String.format("Invalid job label value '%s'.", value),
"Job label value can only contain lowercase letters, numeric characters, " +
"underscores, and dashes.").withConfigProperty(NAME_BQ_JOB_LABELS);
}

if (isKeyValid && !uniqueKeys.add(key)) {
failureCollector.addFailure(String.format("Duplicate job label key '%s'.", key),
"Job label key should be unique.").withConfigProperty(NAME_BQ_JOB_LABELS);
}
}
// Check if number of labels is greater than 64 - reserved keys
if (uniqueKeys.size() > maxLabels) {
failureCollector.addFailure("Number of job labels exceeds the limit.",
String.format("Number of job labels cannot be greater than %d.", maxLabels))
.withConfigProperty(NAME_BQ_JOB_LABELS);
}
}

public String getDatasetProject() {
return connection == null ? null : connection.getDatasetProject();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,7 @@ public void commitJob(JobContext jobContext) throws IOException {
allowSchemaRelaxationOnEmptyOutput =
conf.getBoolean(BigQueryConstants.CONFIG_ALLOW_SCHEMA_RELAXATION_ON_EMPTY_OUTPUT, false);
LOG.debug("Allow schema relaxation: '{}'", allowSchemaRelaxation);
String jobLabelKeyValue = conf.get(BigQueryConstants.CONFIG_JOB_LABEL_KEY_VALUE, null);
PartitionType partitionType = conf.getEnum(BigQueryConstants.CONFIG_PARTITION_TYPE, PartitionType.NONE);
LOG.debug("Create Partitioned Table type: '{}'", partitionType);
com.google.cloud.bigquery.TimePartitioning.Type timePartitioningType = conf.getEnum(
Expand Down Expand Up @@ -263,7 +264,7 @@ public void commitJob(JobContext jobContext) throws IOException {
try {
importFromGcs(destProjectId, destTable, destSchema.orElse(null), kmsKeyName, outputFileFormat,
writeDisposition, sourceUris, partitionType, timePartitioningType, range, partitionByField,
requirePartitionFilter, clusteringOrderList, tableExists, conf);
requirePartitionFilter, clusteringOrderList, tableExists, jobLabelKeyValue, conf);
} catch (Exception e) {
throw new IOException("Failed to import GCS into BigQuery. ", e);
}
Expand Down Expand Up @@ -309,7 +310,8 @@ private void importFromGcs(String projectId, TableReference tableRef, @Nullable
List<String> gcsPaths, PartitionType partitionType,
com.google.cloud.bigquery.TimePartitioning.Type timePartitioningType,
@Nullable Range range, @Nullable String partitionByField, boolean requirePartitionFilter,
List<String> clusteringOrderList, boolean tableExists, Configuration conf)
List<String> clusteringOrderList, boolean tableExists, String jobLabelKeyValue,
Configuration conf)
throws IOException, InterruptedException {
LOG.info("Importing into table '{}' from {} paths; path[0] is '{}'; awaitCompletion: {}",
BigQueryStrings.toString(tableRef), gcsPaths.size(), gcsPaths.isEmpty() ? "(empty)" : gcsPaths.get(0),
Expand Down Expand Up @@ -431,18 +433,18 @@ private void importFromGcs(String projectId, TableReference tableRef, @Nullable

JobConfiguration config = new JobConfiguration();
config.setLoad(loadConfig);
config.setLabels(BigQueryUtil.getJobTags(BigQueryUtil.BQ_JOB_TYPE_SINK_TAG));
config.setLabels(BigQueryUtil.getJobLabels(BigQueryUtil.BQ_JOB_TYPE_SINK_TAG, jobLabelKeyValue));
triggerBigqueryJob(projectId, jobId , dataset, config, tableRef);
} else {
// First load the data in a temp table.
loadInBatchesInTempTable(tableRef, loadConfig, gcsPaths, projectId, jobId, dataset);
loadInBatchesInTempTable(tableRef, loadConfig, gcsPaths, projectId, jobId, dataset, jobLabelKeyValue);

if (operation.equals(Operation.INSERT)) { // For the case when gcs paths is more than 10000
handleInsertOperation(tableRef, writeDisposition, loadConfig.getDestinationEncryptionConfiguration(),
projectId, jobId, dataset, tableExists);
projectId, jobId, dataset, tableExists, jobLabelKeyValue);
} else {
handleUpdateUpsertOperation(tableRef, tableExists, kmsKeyName, getJobIdForUpdateUpsert(conf),
projectId, dataset);
projectId, dataset, jobLabelKeyValue);
}
}

Expand Down Expand Up @@ -471,7 +473,8 @@ private void triggerBigqueryJob(String projectId, String jobId, Dataset dataset,
}

private void loadInBatchesInTempTable(TableReference tableRef, JobConfigurationLoad loadConfig,
List<String> gcsPaths, String projectId, String jobId, Dataset dataset)
List<String> gcsPaths, String projectId, String jobId, Dataset dataset,
String jobLabelKeyValue)
throws IOException, InterruptedException {

LOG.info(" Importing into a temporary table first in batches of 10000");
Expand All @@ -495,7 +498,7 @@ private void loadInBatchesInTempTable(TableReference tableRef, JobConfigurationL
loadConfig.setSourceUris(gcsPathBatch);
JobConfiguration config = new JobConfiguration();
config.setLoad(loadConfig);
config.setLabels(BigQueryUtil.getJobTags(BigQueryUtil.BQ_JOB_TYPE_SINK_TAG));
config.setLabels(BigQueryUtil.getJobLabels(BigQueryUtil.BQ_JOB_TYPE_SINK_TAG, jobLabelKeyValue));

triggerBigqueryJob(projectId, jobId + "_" + jobcount, dataset, config, tableRef);
jobcount++;
Expand Down Expand Up @@ -627,7 +630,8 @@ private static Optional<TableSchema> getTableSchema(Configuration conf) throws I

private void handleInsertOperation(TableReference tableRef, String writeDisposition,
EncryptionConfiguration encryptionConfiguration, String projectId, String jobId,
Dataset dataset, boolean tableExists) throws IOException, InterruptedException {
Dataset dataset, boolean tableExists,
String jobLabelKeyValue) throws IOException, InterruptedException {
if (allowSchemaRelaxation && tableExists) {
updateTableSchema(tableRef);
}
Expand All @@ -639,7 +643,7 @@ private void handleInsertOperation(TableReference tableRef, String writeDisposit

JobConfiguration config = new JobConfiguration();
config.setCopy(tableCopyConfig);
config.setLabels(BigQueryUtil.getJobTags(BigQueryUtil.BQ_JOB_TYPE_SINK_TAG));
config.setLabels(BigQueryUtil.getJobLabels(BigQueryUtil.BQ_JOB_TYPE_SINK_TAG, jobLabelKeyValue));
triggerBigqueryJob(projectId, jobId, dataset, config, tableRef);
}

Expand All @@ -648,7 +652,8 @@ private void handleUpdateUpsertOperation(TableReference tableRef,
@Nullable String cmekKey,
JobId jobId,
String projectId,
Dataset dataset) throws IOException, InterruptedException {
Dataset dataset,
String jobLabelKeyValue) throws IOException, InterruptedException {
if (allowSchemaRelaxation && tableExists) {
updateTableSchema(tableRef);
}
Expand Down Expand Up @@ -677,7 +682,7 @@ private void handleUpdateUpsertOperation(TableReference tableRef,

// Create Job Configuration and add job labels
JobConfiguration jobConfiguration = new JobConfiguration();
jobConfiguration.setLabels(BigQueryUtil.getJobTags(BigQueryUtil.BQ_JOB_TYPE_SINK_TAG));
jobConfiguration.setLabels(BigQueryUtil.getJobLabels(BigQueryUtil.BQ_JOB_TYPE_SINK_TAG, jobLabelKeyValue));
jobConfiguration.setQuery(jobConfigurationQuery);

// Trigger job execution
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -197,12 +197,13 @@ private BigQuerySinkConfig(@Nullable String referenceName, @Nullable String proj
@Nullable String serviceAccountType, @Nullable String serviceFilePath,
@Nullable String serviceAccountJson,
@Nullable String dataset, @Nullable String table, @Nullable String location,
@Nullable String cmekKey, @Nullable String bucket) {
@Nullable String cmekKey, @Nullable String bucket, @Nullable String jobLabelKeyValue) {
super(new BigQueryConnectorConfig(project, project, serviceAccountType,
serviceFilePath, serviceAccountJson), dataset, cmekKey, bucket);
this.referenceName = referenceName;
this.table = table;
this.location = location;
this.jobLabelKeyValue = jobLabelKeyValue;
}

public String getTable() {
Expand Down Expand Up @@ -696,6 +697,7 @@ public static class Builder {
private String cmekKey;
private String location;
private String bucket;
private String jobLabelKeyValue;

public BigQuerySinkConfig.Builder setReferenceName(@Nullable String referenceName) {
this.referenceName = referenceName;
Expand Down Expand Up @@ -746,6 +748,10 @@ public BigQuerySinkConfig.Builder setBucket(@Nullable String bucket) {
this.bucket = bucket;
return this;
}
public BigQuerySinkConfig.Builder setJobLabelKeyValue(@Nullable String jobLabelKeyValue) {
this.jobLabelKeyValue = jobLabelKeyValue;
return this;
}

public BigQuerySinkConfig build() {
return new BigQuerySinkConfig(
Expand All @@ -758,7 +764,8 @@ public BigQuerySinkConfig build() {
table,
location,
cmekKey,
bucket
bucket,
jobLabelKeyValue
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,7 @@ private static void runQuery(Configuration configuration,

JobConfiguration config = new JobConfiguration();
config.setQuery(queryConfig);
config.setLabels(BigQueryUtil.getJobTags(BigQueryUtil.BQ_JOB_TYPE_SOURCE_TAG));
config.setLabels(BigQueryUtil.getJobLabels(BigQueryUtil.BQ_JOB_TYPE_SOURCE_TAG));

JobReference jobReference = getJobReference(configuration, bigQueryHelper, projectId, location);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -357,7 +357,7 @@ public static Map<String, String> getJobTags(BigQueryJobType operation) {
* @return Map containing tags for a job.
*/
public static Map<String, String> getJobTags(String operation) {
Map<String, String> labels = BigQueryUtil.getJobTags(BigQueryUtil.BQ_JOB_TYPE_PUSHDOWN_TAG);
Map<String, String> labels = BigQueryUtil.getJobLabels(BigQueryUtil.BQ_JOB_TYPE_PUSHDOWN_TAG);
labels.put("pushdown_operation", operation);
return labels;
}
Expand Down
Loading