Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DAT-18261] feat: snapshotting of extended view properties #191

Merged
merged 14 commits into from
Oct 11, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package liquibase.ext.databricks.diff.output.changelog;

import liquibase.change.Change;
import liquibase.change.core.CreateViewChange;
import liquibase.database.Database;
import liquibase.diff.output.DiffOutputControl;
import liquibase.diff.output.changelog.ChangeGeneratorChain;
import liquibase.diff.output.changelog.core.MissingViewChangeGenerator;
import liquibase.ext.databricks.change.createView.CreateViewChangeDatabricks;
import liquibase.ext.databricks.database.DatabricksDatabase;
import liquibase.structure.DatabaseObject;
import liquibase.structure.core.View;

/**
* Custom implementation of {@link MissingViewChangeGenerator} for Databricks.
*/
public class MissingViewChangeGeneratorDatabricks extends MissingViewChangeGenerator {

@Override
public int getPriority(Class<? extends DatabaseObject> objectType, Database database) {
if (database instanceof DatabricksDatabase && View.class.isAssignableFrom(objectType)) {
return PRIORITY_DATABASE;
} else {
return PRIORITY_NONE;
}
}

@Override
public Change[] fixMissing(DatabaseObject missingObject, DiffOutputControl control, Database referenceDatabase, Database comparisonDatabase, ChangeGeneratorChain chain) {
Change[] changes = super.fixMissing(missingObject, control, referenceDatabase, comparisonDatabase, chain);
if (changes == null || changes.length == 0) {
return changes;
}
changes[0] = getCreateViewChangeDatabricks(missingObject.getAttribute("tblProperties", String.class), changes);
return changes;
}

private CreateViewChangeDatabricks getCreateViewChangeDatabricks(String tblProperties, Change[] changes) {
CreateViewChange temp = (CreateViewChange) changes[0];
CreateViewChangeDatabricks createViewChangeDatabricks = new CreateViewChangeDatabricks();
createViewChangeDatabricks.setViewName(temp.getViewName());
createViewChangeDatabricks.setSelectQuery(temp.getSelectQuery());
createViewChangeDatabricks.setReplaceIfExists(temp.getReplaceIfExists());
createViewChangeDatabricks.setSchemaName(temp.getSchemaName());
createViewChangeDatabricks.setCatalogName(temp.getCatalogName());
createViewChangeDatabricks.setRemarks(temp.getRemarks());
createViewChangeDatabricks.setFullDefinition(temp.getFullDefinition());
createViewChangeDatabricks.setPath(temp.getPath());
createViewChangeDatabricks.setRelativeToChangelogFile(temp.getRelativeToChangelogFile());
createViewChangeDatabricks.setEncoding(temp.getEncoding());
createViewChangeDatabricks.setTblProperties(tblProperties);
return createViewChangeDatabricks;
}

@Override
protected CreateViewChange createViewChange() {
return new CreateViewChangeDatabricks();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,34 +4,28 @@
import liquibase.Scope;
import liquibase.database.AbstractJdbcDatabase;
import liquibase.database.Database;
import liquibase.database.DatabaseConnection;
import liquibase.exception.DatabaseException;
import liquibase.executor.ExecutorService;
import liquibase.ext.databricks.database.DatabricksConnection;
import liquibase.ext.databricks.database.DatabricksDatabase;
import liquibase.snapshot.DatabaseSnapshot;
import liquibase.snapshot.jvm.ViewSnapshotGenerator;
import liquibase.statement.core.RawSqlStatement;
import liquibase.statement.core.RawParameterizedSqlStatement;
import liquibase.structure.DatabaseObject;
import liquibase.structure.core.Schema;
import liquibase.structure.core.View;
import org.apache.commons.lang3.StringUtils;

import java.sql.ResultSet;
import java.util.List;
import java.util.Map;

import liquibase.ext.databricks.database.DatabricksDatabase;
import org.apache.commons.lang3.StringUtils;

/**
* Overrides ViewSnapshotGenerator for Databricks views contemplating the tblProperties field
*/
public class ViewSnapshotGeneratorDatabricks extends ViewSnapshotGenerator {


@Override
public int getPriority(Class<? extends DatabaseObject> objectType, Database database) {
if (database instanceof DatabricksDatabase) {
return super.getPriority(objectType, database) + PRIORITY_DATABASE;
} else {
return PRIORITY_NONE;
}
return database instanceof DatabricksDatabase ? PRIORITY_DATABASE : PRIORITY_NONE;
}

@Override
Expand All @@ -41,74 +35,57 @@ protected DatabaseObject snapshotObject(DatabaseObject example, DatabaseSnapshot
} else {
Database database = snapshot.getDatabase();
Schema schema = example.getSchema();
DatabaseConnection connection = database.getConnection();

CatalogAndSchema catalogAndSchema = (new CatalogAndSchema(schema.getCatalogName(), schema.getName())).customize(database);
String jdbcSchemaName = database.correctObjectName(((AbstractJdbcDatabase) database).getJdbcSchemaName(catalogAndSchema), Schema.class);
String query = String.format("SELECT view_definition FROM %s.%s.VIEWS WHERE table_name='%s' AND table_schema='%s' AND table_catalog='%s';",
schema.getCatalogName(), database.getSystemSchema(), example.getName(), schema.getName(), schema.getCatalogName());

// DEBUG
//System.out.println("Snapshot Database Connection URL : " + database.getConnection().getURL());
//System.out.println("Snapshot Database Connection Class : " + database.getConnection().getClass().getName());

String query = String.format("SELECT view_definition FROM %s.%s.VIEWS WHERE table_name=? AND table_schema=? AND table_catalog=?",
schema.getCatalogName(), database.getSystemSchema());

List<Map<String, ?>> viewsMetadataRs = Scope.getCurrentScope().getSingleton(ExecutorService.class)
.getExecutor("jdbc", database).queryForList(new RawSqlStatement(query));

// New Code, likely superfluous, was used for testing
/// This should use our existing DatabaseConnection url processing
String rawViewDefinition = null;

try (ResultSet viewMetadataResultSet = ((DatabricksConnection) connection).createStatement().executeQuery(query)) {
//System.out.println("Raw Result VIEW " + viewMetadataResultSet);

viewMetadataResultSet.next();
rawViewDefinition = viewMetadataResultSet.getString(1);


} catch (Exception e) {
Scope.getCurrentScope().getLog(getClass()).info("Error getting View Definiton via existing context, going to pull from URL", e);
}

/// Old Code
.getExecutor("jdbc", database).queryForList(new RawParameterizedSqlStatement(query, example.getName(), schema.getName(), schema.getCatalogName()));

if (viewsMetadataRs.isEmpty()) {
return null;
} else {

Map<String, ?> row = viewsMetadataRs.get(0);
String rawViewName = example.getName();
String viewName = this.cleanNameFromDatabase(example.getName(), database);
String rawSchemaName = schema.getName();
String rawCatalogName = schema.getCatalogName();


View view = (new View()).setName(this.cleanNameFromDatabase(rawViewName, database));
CatalogAndSchema schemaFromJdbcInfo = ((AbstractJdbcDatabase) database).getSchemaFromJdbcInfo(rawCatalogName, rawSchemaName);
view.setSchema(new Schema(schemaFromJdbcInfo.getCatalogName(), schemaFromJdbcInfo.getSchemaName()));

String definition = rawViewDefinition;
View view = (View) new View()
.setName(viewName)
.setSchema(new Schema(schemaFromJdbcInfo.getCatalogName(), schemaFromJdbcInfo.getSchemaName()))
.setAttribute("tblProperties", this.getTblProperties(database, viewName));
view.setDefinition(getViewDefinition(viewsMetadataRs));

if (definition == null || definition.isEmpty()) {
definition = (String) row.get("view_definition");

}
return view;
}
}
}

int length = definition.length();
if (length > 0 && definition.charAt(length - 1) == 0) {
definition = definition.substring(0, length - 1);
}
private String getViewDefinition(List<Map<String, ?>> viewsMetadataRs) {
Map<String, ?> row = viewsMetadataRs.get(0);
String definition = (String) row.get("VIEW_DEFINITION");

definition = StringUtils.trimToNull(definition);
if (definition == null) {
definition = "[CANNOT READ VIEW DEFINITION]";
}
int length = definition.length();
if (length > 0 && definition.charAt(length - 1) == 0) {
definition = definition.substring(0, length - 1);
}

view.setDefinition(definition);
definition = StringUtils.trimToNull(definition);
if (definition == null) {
definition = "[CANNOT READ VIEW DEFINITION]";
}
return definition;
}

return view;
}
private String getTblProperties(Database database, String viewName) throws DatabaseException {
String query = String.format("SHOW TBLPROPERTIES %s.%s.%s;", database.getDefaultCatalogName(), database.getDefaultSchemaName(), viewName);
List<Map<String, ?>> tablePropertiesResponse = Scope.getCurrentScope().getSingleton(ExecutorService.class)
.getExecutor("jdbc", database).queryForList(new RawParameterizedSqlStatement(query));

}
StringBuilder csvString = new StringBuilder();
tablePropertiesResponse.forEach(tableProperty ->
csvString.append("'").append(tableProperty.get("KEY")).append("'='").append(tableProperty.get("VALUE")).append("', ")
);
return csvString.toString().replaceAll(", $", "");
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
liquibase.ext.databricks.diff.output.changelog.MissingViewChangeGeneratorDatabricks
Loading