Skip to content

Commit

Permalink
DAT-18792: added cluster by support for snapshot and diff table relat…
Browse files Browse the repository at this point in the history
…ed change types
  • Loading branch information
Mykhailo Savchenko committed Oct 15, 2024
1 parent 31efa0e commit 32bd731
Show file tree
Hide file tree
Showing 7 changed files with 120 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,12 @@
import liquibase.structure.DatabaseObject;
import liquibase.structure.core.Table;

import java.util.regex.Matcher;
import java.util.regex.Pattern;

public class MissingTableChangeGeneratorDatabricks extends MissingTableChangeGenerator {

private static final String CLUSTERING_INFORMATION_TBL_PROPERTY_START = "clusteringColumns=[[";

@Override
public int getPriority(Class<? extends DatabaseObject> objectType, Database database) {
Expand Down Expand Up @@ -51,6 +55,13 @@ private CreateTableChangeDatabricks getCreateTableChangeDatabricks(ExtendedTable
createTableChangeDatabricks.setRemarks(temp.getRemarks());
createTableChangeDatabricks.setIfNotExists(temp.getIfNotExists());
createTableChangeDatabricks.setRowDependencies(temp.getRowDependencies());
if(extendedTableProperties.getTblProperties().contains(CLUSTERING_INFORMATION_TBL_PROPERTY_START)) {
//This approach should be rewritten after implementing tblProperties map parsing to Map struct collection
String tblProperties = extendedTableProperties.getTblProperties();
createTableChangeDatabricks.setClusterColumns(parseClusterColumns(tblProperties));
//Remove clusteringColumns property from tblProperties
extendedTableProperties.setTblProperties(tblProperties.replace(getClusteringColumnsSubstring(tblProperties), ""));
}

createTableChangeDatabricks.setExtendedTableProperties(extendedTableProperties);
return createTableChangeDatabricks;
Expand All @@ -60,4 +71,44 @@ private CreateTableChangeDatabricks getCreateTableChangeDatabricks(ExtendedTable
protected CreateTableChange createCreateTableChange() {
return new CreateTableChangeDatabricks();
}
}

/**
* There might be 2 edge cases with table properties map:
* <ul><li>User specified a custom tblProperty that appears prior databricks internal managed
* clusteringColumns property and this custom tblProperty contains string snippet 'clusteringColumns=[['.
* Pattern and matcher would process properties incorrect if there was present structure ["anyString"]
* in between string snippet 'clusteringColumns=[[' and actual databricks managed property
* clusteringColumns.
* <li>User specified a custom table property that contains string snippet 'clusteringColumns=' and there are
* no clustering configured on the table.<ul/>
* @param tblProperties
* The tblProperties map in a raw string format that returns as part of result set of
* DESCRIBE TABLE EXTENDED query.
* @return Coma separated clustering columns extracted from tblProperties
* */
private String parseClusterColumns(String tblProperties) {
// Actual pattern - "\[\"([^\"]*)\"\]"
Pattern pattern = Pattern.compile("\\[\\\"([^\\\"]*)\\\"\\]");
String clusteringColumnsRaw = getClusteringColumnsSubstring(tblProperties);
StringBuilder clusterColumnNames = new StringBuilder();
try{
Matcher matcher = pattern.matcher(clusteringColumnsRaw);
for (int i = 0; matcher.find(); i++) {
//getting first matching group to avoid quotes and brackets
String group = matcher.group(1);
clusterColumnNames.append(i == 0 ? "" : ",").append(group);
}
} catch (IllegalStateException e) {
e.printStackTrace();
}

return clusterColumnNames.toString();
}

private String getClusteringColumnsSubstring(String tblProperties) {
int clusterColumnsStartIndex = tblProperties.indexOf(CLUSTERING_INFORMATION_TBL_PROPERTY_START);
// To avoid appearance anywhere before clusteringColumns property we are specifying clusterColumnsStartIndex
// to start search from for end index snippet.
return tblProperties.substring(clusterColumnsStartIndex, tblProperties.indexOf("\"]],", clusterColumnsStartIndex) + 4);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

public class CreateTableGeneratorDatabricks extends CreateTableGenerator {

private static final String CLUSTERING_INFORMATION_TBL_PROPERTY_START = "clusteringColumns=[[";


@Override
public int getPriority() {
Expand Down Expand Up @@ -49,7 +51,7 @@ public Sql[] generateSql(CreateTableStatement statement, Database database, SqlG
if ((!StringUtils.isEmpty(thisStatement.getTableFormat()))) {
finalsql.append(" USING ").append(thisStatement.getTableFormat());
} else if (thisStatement.getExtendedTableProperties() != null && StringUtils.isNoneEmpty(thisStatement.getExtendedTableProperties().getTblProperties())) {
finalsql.append(" TBLPROPERTIES (").append(thisStatement.getExtendedTableProperties().getTblProperties()).append(")");
finalsql.append(" TBLPROPERTIES (").append(avoidClusterProperties(thisStatement)).append(")");
} else {
finalsql.append(" USING delta TBLPROPERTIES('delta.feature.allowColumnDefaults' = 'supported', 'delta.columnMapping.mode' = 'name', 'delta.enableDeletionVectors' = true)");
}
Expand Down Expand Up @@ -109,4 +111,23 @@ public Sql[] generateSql(CreateTableStatement statement, Database database, SqlG

}

/**
* While we are passing TBLPROPERTIES as raw string into create table statement, especially in cases of
* changelog generation we need to sanitize them from 'clusteringColumns' property, otherwise generated changelog
* will fail to execute.
* Parsing of tblProperties map as an actual Map structured collection should make this approach safer and easier.
* @param statement CreateTableStatementDatabricks containing tblProperties raw string
* @return tblProperties string without 'clusteringColumns' property if it was present, otherwise untouched
* tblProperties raw string.
* */
private String avoidClusterProperties(CreateTableStatementDatabricks statement) {
String tblProperties = statement.getExtendedTableProperties().getTblProperties();
if(tblProperties.contains(CLUSTERING_INFORMATION_TBL_PROPERTY_START)) {
int clusterColumnsStartIndex = tblProperties.indexOf(CLUSTERING_INFORMATION_TBL_PROPERTY_START);
String replaceString = tblProperties.substring(clusterColumnsStartIndex, tblProperties.indexOf("\"]],", clusterColumnsStartIndex) + 4);
return tblProperties.replace(replaceString, "");
}
return tblProperties;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,15 @@
"name": "test_new",
"type": "int"
}
},
{
"column": {
"name": "test_present_new",
"type": "int"
}
}
],
"clusterColumns": "test_id,test_new"
"clusterColumns": "test_id,test_new,test_present_new"
}
}
],
Expand All @@ -48,6 +54,11 @@
"column": {
"name": "test_id"
}
},
{
"column": {
"name": "test_present_new`"
}
}
]
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@
<createTable tableName="test_table_clustered_new" >
<column name="test_id" type="int" />
<column name="test_new" type="int"/>
<databricks:clusterColumns>test_id,test_new</databricks:clusterColumns>
<column name="test_present_new" type="int"/>
<databricks:clusterColumns>test_id,test_new,test_present_new</databricks:clusterColumns>
</createTable>
<rollback>
<!-- The dropTable will drop a full table whether it has clustered columns or not. -->
Expand All @@ -23,6 +24,7 @@
<changeSet id="2" author="your.name">
<databricks:alterCluster tableName="test_table_clustered_new">
<databricks:column name="test_id"/>
<databricks:column name="test_present_new"/>
</databricks:alterCluster>
<rollback/>
</changeSet>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,10 @@ databaseChangeLog:
- column:
name: test_new
type: int
clusterColumns: test_id, test_new
- column:
name: test_present_new
type: int
clusterColumns: test_id, test_new, test_present_new
rollback:
dropTable:
tableName: test_table_clustered_new
Expand All @@ -25,6 +28,8 @@ databaseChangeLog:
columns:
- column:
name: test_id
- column:
name: test_present_new
rollback:
empty
- changeSet:
Expand Down
Original file line number Diff line number Diff line change
@@ -1,2 +1,25 @@
{
"snapshot": {
"objects": {
"liquibase.structure.core.Table": [
{
"table": {
"name": "test_table_clustered_new"
}
}
],
"liquibase.structure.core.Column": [
{
"column": {
"name": "test_id"
}
},
{
"column": {
"name": "test_present_new"
}
}
]
}
}
}
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
CREATE TABLE main.liquibase_harness_test_ds.test_table_clustered_new (test_id INT, test_new INT) USING delta TBLPROPERTIES('delta.feature.allowColumnDefaults' = 'supported', 'delta.columnMapping.mode' = 'name', 'delta.enableDeletionVectors' = true) CLUSTER BY (test_id, test_new)
ALTER TABLE main.liquibase_harness_test_ds.test_table_clustered_new CLUSTER BY (test_id)
CREATE TABLE main.liquibase_harness_test_ds.test_table_clustered_new (test_id INT, test_new INT, test_present_new INT) USING delta TBLPROPERTIES('delta.feature.allowColumnDefaults' = 'supported', 'delta.columnMapping.mode' = 'name', 'delta.enableDeletionVectors' = true) CLUSTER BY (test_id, test_new, test_present_new)
ALTER TABLE main.liquibase_harness_test_ds.test_table_clustered_new CLUSTER BY (test_id,test_present_new)
ALTER TABLE main.liquibase_harness_test_ds.test_table_clustered_new DROP COLUMN test_new

0 comments on commit 32bd731

Please sign in to comment.