Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DAT-18792: added cluster by support for snapshot and diff table related change types #199

Open
wants to merge 14 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,9 @@
import liquibase.structure.DatabaseObject;
import liquibase.structure.core.Table;

public class MissingTableChangeGeneratorDatabricks extends MissingTableChangeGenerator {
import java.util.regex.Pattern;

public class MissingTableChangeGeneratorDatabricks extends MissingTableChangeGenerator {

@Override
public int getPriority(Class<? extends DatabaseObject> objectType, Database database) {
Expand All @@ -34,12 +35,14 @@ public Change[] fixMissing(DatabaseObject missingObject, DiffOutputControl contr
ExtendedTableProperties extendedTableProperties = new ExtendedTableProperties(
missingObject.getAttribute("Location", String.class),
missingObject.getAttribute("tblProperties", String.class));
String clusterColumns = missingObject.getAttribute("clusterColumns", "");

changes[0] = getCreateTableChangeDatabricks(extendedTableProperties, changes);
changes[0] = getCreateTableChangeDatabricks(extendedTableProperties, changes, clusterColumns);
return changes;
}

private CreateTableChangeDatabricks getCreateTableChangeDatabricks(ExtendedTableProperties extendedTableProperties, Change[] changes) {
private CreateTableChangeDatabricks getCreateTableChangeDatabricks(ExtendedTableProperties extendedTableProperties,
Change[] changes, String clusterColumns) {
CreateTableChange temp = (CreateTableChange) changes[0];
CreateTableChangeDatabricks createTableChangeDatabricks = new CreateTableChangeDatabricks();
createTableChangeDatabricks.setColumns(temp.getColumns());
Expand All @@ -51,6 +54,9 @@ private CreateTableChangeDatabricks getCreateTableChangeDatabricks(ExtendedTable
createTableChangeDatabricks.setRemarks(temp.getRemarks());
createTableChangeDatabricks.setIfNotExists(temp.getIfNotExists());
createTableChangeDatabricks.setRowDependencies(temp.getRowDependencies());
if (!clusterColumns.isEmpty()) {
createTableChangeDatabricks.setClusterColumns(sanitizeClusterColumns(clusterColumns));
}

createTableChangeDatabricks.setExtendedTableProperties(extendedTableProperties);
return createTableChangeDatabricks;
Expand All @@ -60,4 +66,9 @@ private CreateTableChangeDatabricks getCreateTableChangeDatabricks(ExtendedTable
protected CreateTableChange createCreateTableChange() {
return new CreateTableChangeDatabricks();
}
}

private String sanitizeClusterColumns(String clusterColumnProperty) {
Pattern pattern = Pattern.compile("[\\[\\]\\\"]");
return clusterColumnProperty.replaceAll(pattern.toString(), "");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,13 @@

import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

public class TableSnapshotGeneratorDatabricks extends TableSnapshotGenerator {

private static final String LOCATION = "Location";
private static final String TABLE_PROPERTIES = "Table Properties";
private static final String TBL_PROPERTIES = "tblProperties";
private static final String CLUSTER_COLUMNS = "clusterColumns";
private static final String DETAILED_TABLE_INFORMATION_NODE = "# Detailed Table Information";

@Override
Expand Down Expand Up @@ -49,13 +50,32 @@ protected DatabaseObject snapshotObject(DatabaseObject example, DatabaseSnapshot
if (detailedInformationNode && tableProperty.get("COL_NAME").equals(LOCATION)) {
table.setAttribute(LOCATION, tableProperty.get("DATA_TYPE"));
}
if (detailedInformationNode && tableProperty.get("COL_NAME").equals(TABLE_PROPERTIES)) {
String tblProperties = (String) tableProperty.get("DATA_TYPE");
table.setAttribute(TBL_PROPERTIES, tblProperties.substring(1, tblProperties.length() - 1));// remove starting and ending square brackets
}
}
Map<String, String> tblProperties = getTblPropertiesMap(database, example.getName());
if (tblProperties.containsKey(CLUSTER_COLUMNS)) {
// used remove, as clusterColumns tblProperty is not allowed in create/alter table statements
table.setAttribute(CLUSTER_COLUMNS, tblProperties.remove(CLUSTER_COLUMNS));
}
table.setAttribute(TBL_PROPERTIES, getTblPropertiesString(tblProperties));
}
return table;
}

private Map<String, String> getTblPropertiesMap(Database database, String table) throws DatabaseException {
String query = String.format("SHOW TBLPROPERTIES %s.%s.%s;", database.getDefaultCatalogName(), database.getDefaultSchemaName(), table);
List<Map<String, ?>> tablePropertiesResponse = Scope.getCurrentScope().getSingleton(ExecutorService.class)
.getExecutor("jdbc", database).queryForList(new RawParameterizedSqlStatement(query));
return tablePropertiesResponse.stream()
.collect(Collectors.toMap(mapElement -> (String) mapElement.get("KEY"), mapElement -> (String) mapElement.get("VALUE")));
}

private String getTblPropertiesString(Map<String, String> propertiesMap) {
StringBuilder csvString = new StringBuilder();
propertiesMap.entrySet()
.stream()
.sorted(Map.Entry.comparingByKey())
.forEach(entry -> csvString.append(entry.getKey()).append("=").append(entry.getValue()).append(","));
return csvString.toString().replaceAll(",$", "");
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

public class CreateTableGeneratorDatabricks extends CreateTableGenerator {

private static final String CLUSTERING_INFORMATION_TBL_PROPERTY_START = "clusteringColumns=[[";


@Override
public int getPriority() {
Expand Down Expand Up @@ -49,7 +51,7 @@ public Sql[] generateSql(CreateTableStatement statement, Database database, SqlG
if ((!StringUtils.isEmpty(thisStatement.getTableFormat()))) {
finalsql.append(" USING ").append(thisStatement.getTableFormat());
} else if (thisStatement.getExtendedTableProperties() != null && StringUtils.isNoneEmpty(thisStatement.getExtendedTableProperties().getTblProperties())) {
finalsql.append(" TBLPROPERTIES (").append(thisStatement.getExtendedTableProperties().getTblProperties()).append(")");
finalsql.append(" TBLPROPERTIES (").append(avoidClusterProperties(thisStatement)).append(")");
} else {
finalsql.append(" USING delta TBLPROPERTIES('delta.feature.allowColumnDefaults' = 'supported', 'delta.columnMapping.mode' = 'name', 'delta.enableDeletionVectors' = true)");
}
Expand Down Expand Up @@ -109,4 +111,23 @@ public Sql[] generateSql(CreateTableStatement statement, Database database, SqlG

}

/**
* While we are passing TBLPROPERTIES as raw string into create table statement, especially in cases of
* changelog generation we need to sanitize them from 'clusteringColumns' property, otherwise generated changelog
* will fail to execute.
* Parsing of tblProperties map as an actual Map structured collection should make this approach safer and easier.
* @param statement CreateTableStatementDatabricks containing tblProperties raw string
* @return tblProperties string without 'clusteringColumns' property if it was present, otherwise untouched
* tblProperties raw string.
* */
private String avoidClusterProperties(CreateTableStatementDatabricks statement) {
String tblProperties = statement.getExtendedTableProperties().getTblProperties();
if(tblProperties.contains(CLUSTERING_INFORMATION_TBL_PROPERTY_START)) {
int clusterColumnsStartIndex = tblProperties.indexOf(CLUSTERING_INFORMATION_TBL_PROPERTY_START);
String replaceString = tblProperties.substring(clusterColumnsStartIndex, tblProperties.indexOf("\"]],", clusterColumnsStartIndex) + 4);
return tblProperties.replace(replaceString, "");
}
return tblProperties;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,15 @@
"name": "test_new",
"type": "int"
}
},
{
"column": {
"name": "test_present_new",
"type": "int"
}
}
],
"clusterColumns": "test_id,test_new"
"clusterColumns": "test_id,test_new,test_present_new"
}
}
],
Expand All @@ -48,6 +54,11 @@
"column": {
"name": "test_id"
}
},
{
"column": {
"name": "test_present_new`"
}
}
]
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@
<createTable tableName="test_table_clustered_new" >
<column name="test_id" type="int" />
<column name="test_new" type="int"/>
<databricks:clusterColumns>test_id,test_new</databricks:clusterColumns>
<column name="test_present_new" type="int"/>
<databricks:clusterColumns>test_id,test_new,test_present_new</databricks:clusterColumns>
</createTable>
<rollback>
<!-- The dropTable will drop a full table whether it has clustered columns or not. -->
Expand All @@ -23,6 +24,7 @@
<changeSet id="2" author="your.name">
<databricks:alterCluster tableName="test_table_clustered_new">
<databricks:column name="test_id"/>
<databricks:column name="test_present_new"/>
</databricks:alterCluster>
<rollback/>
</changeSet>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,10 @@ databaseChangeLog:
- column:
name: test_new
type: int
clusterColumns: test_id, test_new
- column:
name: test_present_new
type: int
clusterColumns: test_id, test_new, test_present_new
rollback:
dropTable:
tableName: test_table_clustered_new
Expand All @@ -25,6 +28,8 @@ databaseChangeLog:
columns:
- column:
name: test_id
- column:
name: test_present_new
rollback:
empty
- changeSet:
Expand Down
Original file line number Diff line number Diff line change
@@ -1,2 +1,25 @@
{
"snapshot": {
"objects": {
"liquibase.structure.core.Table": [
{
"table": {
"name": "test_table_clustered_new"
}
}
],
"liquibase.structure.core.Column": [
{
"column": {
"name": "test_id"
}
},
{
"column": {
"name": "test_present_new"
}
}
]
}
}
}
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
CREATE TABLE main.liquibase_harness_test_ds.test_table_clustered_new (test_id INT, test_new INT) USING delta TBLPROPERTIES('delta.feature.allowColumnDefaults' = 'supported', 'delta.columnMapping.mode' = 'name', 'delta.enableDeletionVectors' = true) CLUSTER BY (test_id, test_new)
ALTER TABLE main.liquibase_harness_test_ds.test_table_clustered_new CLUSTER BY (test_id)
CREATE TABLE main.liquibase_harness_test_ds.test_table_clustered_new (test_id INT, test_new INT, test_present_new INT) USING delta TBLPROPERTIES('delta.feature.allowColumnDefaults' = 'supported', 'delta.columnMapping.mode' = 'name', 'delta.enableDeletionVectors' = true) CLUSTER BY (test_id, test_new, test_present_new)
ALTER TABLE main.liquibase_harness_test_ds.test_table_clustered_new CLUSTER BY (test_id,test_present_new)
ALTER TABLE main.liquibase_harness_test_ds.test_table_clustered_new DROP COLUMN test_new
Loading