Skip to content

Commit

Permalink
Added logic to control which stages are forced and skipped from SQL E…
Browse files Browse the repository at this point in the history
…ngine execution
  • Loading branch information
fernst committed Feb 25, 2022
1 parent de91638 commit cf46a5e
Show file tree
Hide file tree
Showing 5 changed files with 140 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -416,7 +416,6 @@ public void cleanup(String datasetName) throws SQLEngineException {
}

/**
*
* @return capabilities provided by {@link BigQuerySQLEngine}, which include {@link StringExpressionFactoryType}.SQL.
*/
@Override
Expand All @@ -425,7 +424,6 @@ public Set<Capability> getCapabilities() {
}

/**
*
* @return the single expression factory provided by {@link BigQuerySQLEngine}, which is {@link SQLExpressionFactory}.
*/
@Override
Expand All @@ -439,7 +437,7 @@ public Relation getRelation(SQLRelationDefinition relationDefinition) {
Set<String> columnSet = new LinkedHashSet<>();
List<Schema.Field> fields = relationDefinition.getSchema().getFields();
if (fields != null) {
for (Schema.Field field: fields) {
for (Schema.Field field : fields) {
columnSet.add(field.getName());
}
}
Expand All @@ -458,6 +456,26 @@ public boolean supportsRelationalTranform() {
return true;
}

@Override
public boolean supportsInputSchema(Schema schema) {
return BigQuerySQLEngineUtils.isSupportedSchema(schema);
}

@Override
public boolean supportsOutputSchema(Schema schema) {
return BigQuerySQLEngineUtils.isSupportedSchema(schema);
}

@Override
public Set<String> getIncludedStageNames() {
return sqlEngineConfig.getIncludedStages();
}

@Override
public Set<String> getExcludedStageNames() {
return sqlEngineConfig.getExcludedStages();
}

@Override
public boolean canTransform(SQLTransformDefinition transformDefinition) {
Relation relation = transformDefinition.getOutputRelation();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

import com.google.cloud.bigquery.QueryJobConfiguration;
import com.google.cloud.kms.v1.CryptoKeyName;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Strings;
import io.cdap.cdap.api.annotation.Description;
import io.cdap.cdap.api.annotation.Macro;
import io.cdap.cdap.api.annotation.Name;
Expand All @@ -31,6 +33,9 @@

import java.util.Collections;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.annotation.Nullable;

/**
Expand All @@ -42,13 +47,16 @@ public class BigQuerySQLEngineConfig extends BigQueryBaseConfig {
public static final String NAME_RETAIN_TABLES = "retainTables";
public static final String NAME_TEMP_TABLE_TTL_HOURS = "tempTableTTLHours";
public static final String NAME_JOB_PRIORITY = "jobPriority";
public static final String NAME_INCLUDED_STAGES = "includedStages";
public static final String NAME_EXCLUDED_STAGES = "excludedStages";
public static final String NAME_USE_STORAGE_READ_API = "useStorageReadAPI";
public static final String NAME_DIRECT_SINK_WRITE = "useDirectSinkWrite";

// Job priority options
public static final String PRIORITY_BATCH = "batch";
public static final String PRIORITY_INTERACTIVE = "interactive";
private static final String SCHEME = "gs://";
private static final String STAGE_SPLIT = "\u0001";

@Name(NAME_LOCATION)
@Macro
Expand Down Expand Up @@ -98,6 +106,20 @@ public class BigQuerySQLEngineConfig extends BigQueryBaseConfig {
"succeed, the standard sink workflow will continue to execute.")
private Boolean useDirectSinkWrite;

@Name(NAME_INCLUDED_STAGES)
@Macro
@Nullable
@Description("Stages that should always be pushed down to the BigQuery ELT Transformation Pushdown engine, " +
"if supported by the engine. Each stage name should be in a separate line.")
protected String includedStages;

@Name(NAME_EXCLUDED_STAGES)
@Macro
@Nullable
@Description("Stages that should never be pushed down to the BigQuery ELT Transformation Pushdown engine, " +
"even when supported. Each stage name should be in a separate line.")
protected String excludedStages;


private BigQuerySQLEngineConfig(@Nullable BigQueryConnectorConfig connection,
@Nullable String dataset, @Nullable String location,
Expand All @@ -121,6 +143,24 @@ public Integer getTempTableTTLHours() {
return tempTableTTLHours != null && tempTableTTLHours > 0 ? tempTableTTLHours : 72;
}

public Set<String> getIncludedStages() {
return splitStages(includedStages);
}

public Set<String> getExcludedStages() {
return splitStages(excludedStages);
}

@VisibleForTesting
protected static Set<String> splitStages(String stages) {
if (Strings.isNullOrEmpty(stages)) {
return Collections.emptySet();
}
return Stream.of(stages.split(STAGE_SPLIT))
.filter(s -> !Strings.isNullOrEmpty(s))
.collect(Collectors.toSet());
}

public Boolean shouldUseStorageReadAPI() {
return useStorageReadAPI != null ? useStorageReadAPI : false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,16 @@ public static void validateJoinOnKeyStages(JoinDefinition joinDefinition, List<S
}
}

/**
* Check if the supplied schema is supported by the SQL Engine
*
* @param schema supplied schema to validate
* @return whether this schema is supported by the SQL engine.
*/
public static boolean isSupportedSchema(Schema schema) {
return BigQuerySchemaValidation.validateSchema(schema).isSupported();
}

/**
* Ensure the Stage name is valid for execution in BQ pushdown.
* <p>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Copyright © 2020 Cask Data, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package io.cdap.plugin.gcp.bigquery.sqlengine;

import org.junit.Assert;
import org.junit.Test;

import java.util.HashSet;
import java.util.Set;

/**
* Test for {@link BigQuerySQLEngineConfig} class
*/
public class BigQuerySQLEngineConfigTest {

@Test
public void testSplitStages() {
Set<String> stages = new HashSet<>();

Assert.assertEquals(stages, BigQuerySQLEngineConfig.splitStages(null));
Assert.assertEquals(stages, BigQuerySQLEngineConfig.splitStages(""));

stages.add(" ");
Assert.assertEquals(stages, BigQuerySQLEngineConfig.splitStages(" "));

stages.add("a");
Assert.assertEquals(stages, BigQuerySQLEngineConfig.splitStages(" \u0001a"));

stages.add("this is some stage");
Assert.assertEquals(stages, BigQuerySQLEngineConfig.splitStages(" \u0001a\u0001this is some stage"));

stages.add(" this is another ");
Assert.assertEquals(stages,
BigQuerySQLEngineConfig.splitStages(
" \u0001a\u0001this is some stage\u0001 this is another "));
}
}
18 changes: 18 additions & 0 deletions widgets/BigQueryPushdownEngine-sqlengine.json
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,24 @@
]
}
},
{
"widget-type": "csv",
"label": "Stages to force execution in the SQL Engine",
"name": "includedStages",
"widget-attributes": {
"delimiter": "\u0001",
"placeholder": "Names of all stages to force push to execute in the BigQuery ELT engine. Each stage name must be on a separate line"
}
},
{
"widget-type": "csv",
"label": "Stages to skip from executing in the SQL engine",
"name": "excludedStages",
"widget-attributes": {
"delimiter": "\u0001",
"placeholder": "Names of all stages to skip from executing in the BigQuery ELT engine. Each stage name must be on a separate line"
}
},
{
"widget-type": "toggle",
"label": "Use BigQuery Storage Read API",
Expand Down

0 comments on commit cf46a5e

Please sign in to comment.