diff --git a/pom.xml b/pom.xml index b122cfd0e..9a78be19d 100644 --- a/pom.xml +++ b/pom.xml @@ -14,7 +14,7 @@ 17 UTF-8 UTF-8 - 2.5.0.Beta1 + 2.7.2.Final 5.9.1 3.1.1 UTF-8 diff --git a/sink-connector-lightweight/dependency-reduced-pom.xml b/sink-connector-lightweight/dependency-reduced-pom.xml index a9ccbc9ed..37d175050 100644 --- a/sink-connector-lightweight/dependency-reduced-pom.xml +++ b/sink-connector-lightweight/dependency-reduced-pom.xml @@ -131,7 +131,7 @@ io.debezium debezium-connector-mongodb - 2.7.0.Beta2 + 2.7.2.Final test @@ -326,7 +326,7 @@ UTF-8 3.1.1 17 - 2.7.0.Beta2 + 2.7.2.Final io.quarkus.platform diff --git a/sink-connector-lightweight/src/main/java/com/altinity/clickhouse/debezium/embedded/ClickHouseDebeziumEmbeddedApplication.java b/sink-connector-lightweight/src/main/java/com/altinity/clickhouse/debezium/embedded/ClickHouseDebeziumEmbeddedApplication.java index 3d356957d..90dc38a1f 100644 --- a/sink-connector-lightweight/src/main/java/com/altinity/clickhouse/debezium/embedded/ClickHouseDebeziumEmbeddedApplication.java +++ b/sink-connector-lightweight/src/main/java/com/altinity/clickhouse/debezium/embedded/ClickHouseDebeziumEmbeddedApplication.java @@ -86,8 +86,7 @@ public static void main(String[] args) throws Exception { setupMonitoringThread(new ClickHouseSinkConnectorConfig(PropertiesHelper.toMap(props)), props); - embeddedApplication.start(injector.getInstance(DebeziumRecordParserService.class), - injector.getInstance(DDLParserService.class), props, false); + embeddedApplication.start(injector.getInstance(DebeziumRecordParserService.class), props, false); try { DebeziumEmbeddedRestApi.startRestApi(props, injector, debeziumChangeEventCapture, userProperties); @@ -141,8 +140,7 @@ public static CompletableFuture startDebeziumEventLoop(Injector injector Thread.sleep(500); // embeddedApplication = new ClickHouseDebeziumEmbeddedApplication(); - embeddedApplication.start(injector.getInstance(DebeziumRecordParserService.class), - injector.getInstance(DDLParserService.class), props, true); + embeddedApplication.start(injector.getInstance(DebeziumRecordParserService.class), props, true); return null; }); @@ -151,7 +149,7 @@ public static CompletableFuture startDebeziumEventLoop(Injector injector public static void start(DebeziumRecordParserService recordParserService, - DDLParserService ddlParserService, Properties props, boolean forceStart) throws Exception { + Properties props, boolean forceStart) throws Exception { if(forceStart == true) { // Reload the configuration file. @@ -159,7 +157,7 @@ public static void start(DebeziumRecordParserService recordParserService, loadPropertiesFile(configurationFile); } debeziumChangeEventCapture = new DebeziumChangeEventCapture(); - debeziumChangeEventCapture.setup(props, recordParserService, ddlParserService, forceStart); + debeziumChangeEventCapture.setup(props, recordParserService, forceStart); } public static void stop() throws IOException { @@ -210,8 +208,7 @@ public void run() { log.info("******* Restarting Event Loop ********"); debeziumChangeEventCapture.stop(); Thread.sleep(3000); - start(injector.getInstance(DebeziumRecordParserService.class), - injector.getInstance(DDLParserService.class), props, true); + start(injector.getInstance(DebeziumRecordParserService.class), props, true); } catch (IOException e) { log.error("**** ERROR: Restarting Event Loop ****", e); throw new RuntimeException(e); diff --git a/sink-connector-lightweight/src/main/java/com/altinity/clickhouse/debezium/embedded/api/DebeziumEmbeddedRestApi.java b/sink-connector-lightweight/src/main/java/com/altinity/clickhouse/debezium/embedded/api/DebeziumEmbeddedRestApi.java index d8c0ce5af..b2772f63b 100644 --- a/sink-connector-lightweight/src/main/java/com/altinity/clickhouse/debezium/embedded/api/DebeziumEmbeddedRestApi.java +++ b/sink-connector-lightweight/src/main/java/com/altinity/clickhouse/debezium/embedded/api/DebeziumEmbeddedRestApi.java @@ -127,7 +127,7 @@ public static void startRestApi(Properties props, Injector injector, try { debeziumChangeEventCapture.deleteSchemaHistory(config, finalProps1); } catch (Exception e) { - log.error("Client - Error deleting offsets", e); + log.error("Client - Error deleting schema history", e); ctx.result(e.toString()); ctx.status(HttpStatus.INTERNAL_SERVER_ERROR); return; diff --git a/sink-connector-lightweight/src/main/java/com/altinity/clickhouse/debezium/embedded/cdc/DebeziumChangeEventCapture.java b/sink-connector-lightweight/src/main/java/com/altinity/clickhouse/debezium/embedded/cdc/DebeziumChangeEventCapture.java index d5bdb6caf..eeab41d69 100644 --- a/sink-connector-lightweight/src/main/java/com/altinity/clickhouse/debezium/embedded/cdc/DebeziumChangeEventCapture.java +++ b/sink-connector-lightweight/src/main/java/com/altinity/clickhouse/debezium/embedded/cdc/DebeziumChangeEventCapture.java @@ -25,6 +25,8 @@ import io.debezium.engine.ChangeEvent; import io.debezium.engine.DebeziumEngine; import io.debezium.engine.spi.OffsetCommitPolicy; +import io.debezium.relational.history.SchemaHistory; +import io.debezium.storage.jdbc.history.JdbcSchemaHistoryConfig; import io.debezium.storage.jdbc.offset.JdbcOffsetBackingStoreConfig; import org.apache.commons.lang3.tuple.Pair; import org.apache.kafka.connect.data.Field; @@ -164,10 +166,20 @@ private void performDDLOperation(String DDL, Properties props, SourceRecord sr, updateMetrics(DDL, writer); } + /** + * Function to get the database name from the SourceRecord. + * If the database name is not present in the SourceRecord, then + * the database name is set to "system". + * Also if a database is overridden in the configuration, then + * the database name is set to the overridden database name. + * @param sr + * @return + */ private String getDatabaseName(SourceRecord sr) { if (sr != null && sr.key() instanceof Struct) { String recordDbName = (String) ((Struct) sr.key()).get("databaseName"); if (recordDbName != null && !recordDbName.isEmpty()) { + return recordDbName; } } @@ -411,8 +423,8 @@ private Pair getDebeziumOffsetStorageDatabaseName(Properties pro * @return */ private Pair getDebeziumSchemaHistoryDatabaseName(Properties props) { - String tableName = props.getProperty(JdbcOffsetBackingStoreConfig.OFFSET_STORAGE_PREFIX + - JdbcOffsetBackingStoreConfig.PROP_TABLE_NAME.name()); + String tableName = props.getProperty(SchemaHistory.CONFIGURATION_FIELD_PREFIX_STRING + + JdbcSchemaHistoryConfig.PROP_TABLE_NAME.name()); return splitTableName(tableName); } @@ -605,10 +617,11 @@ public void deleteSchemaHistory(ClickHouseSinkConnectorConfig config, Properties databaseName, dbCredentials.getUserName(), dbCredentials.getPassword(), config, this.conn); - // Get topic.prefix from config - String topicPrefix = config.getString(CommonConnectorConfig.TOPIC_PREFIX.name()); - new DebeziumOffsetStorage().deleteSchemaHistoryTable(topicPrefix, tableNameDatabaseName.getRight() + "." - + tableNameDatabaseName.getLeft(),writer); + // Get topic.prefix from properies + String topicPrefix = props.getProperty(CommonConnectorConfig.TOPIC_PREFIX.name()); + // String topicPrefix = config.getString(CommonConnectorConfig.TOPIC_PREFIX.name()); + // Jdbc adds the database name to the table name, so we need to remove it + new DebeziumOffsetStorage().deleteSchemaHistoryTable(topicPrefix, tableName, writer); } /** @@ -770,7 +783,7 @@ public void connectorStopped() { * @param debeziumRecordParserService */ public void setup(Properties props, DebeziumRecordParserService debeziumRecordParserService, - DDLParserService ddlParserService, boolean forceStart) throws IOException, ClassNotFoundException { + boolean forceStart) throws IOException, ClassNotFoundException { // Check if max queue size was defined by the user. if(props.getProperty(ClickHouseSinkConnectorConfigVariables.MAX_QUEUE_SIZE.toString()) != null) { diff --git a/sink-connector-lightweight/src/main/java/com/altinity/clickhouse/debezium/embedded/cdc/DebeziumOffsetStorage.java b/sink-connector-lightweight/src/main/java/com/altinity/clickhouse/debezium/embedded/cdc/DebeziumOffsetStorage.java index 61d0ae702..903f92878 100644 --- a/sink-connector-lightweight/src/main/java/com/altinity/clickhouse/debezium/embedded/cdc/DebeziumOffsetStorage.java +++ b/sink-connector-lightweight/src/main/java/com/altinity/clickhouse/debezium/embedded/cdc/DebeziumOffsetStorage.java @@ -2,6 +2,8 @@ import com.altinity.clickhouse.sink.connector.db.BaseDbWriter; +import com.clickhouse.logging.Logger; +import com.clickhouse.logging.LoggerFactory; import io.debezium.storage.jdbc.offset.JdbcOffsetBackingStoreConfig; import org.json.simple.JSONObject; import org.json.simple.parser.JSONParser; @@ -32,6 +34,7 @@ public class DebeziumOffsetStorage { public static final String SOURCE_PASSWORD = "source_password"; + private static final Logger log = LoggerFactory.getLogger(DebeziumOffsetStorage.class); public String getOffsetKey(Properties props) { String connectorName = props.getProperty("name"); @@ -62,7 +65,8 @@ public void deleteSchemaHistoryTable(String offsetKey, BaseDbWriter writer) throws SQLException { - String debeziumStorageStatusQuery = String.format("delete from %s where JSONExtractRaw(JSONExtractRaw(history_data,'source'), 'server')='\"%s\"" , tableName, offsetKey); + String debeziumStorageStatusQuery = String.format("delete from `%s` where JSONExtractRaw(JSONExtractRaw(history_data,'source'), 'server')='%s'" , tableName, offsetKey); + log.info("Deleting schema history table query: " + debeziumStorageStatusQuery); writer.executeQuery(debeziumStorageStatusQuery); } /** diff --git a/sink-connector-lightweight/src/main/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/MySqlDDLParserListenerImpl.java b/sink-connector-lightweight/src/main/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/MySqlDDLParserListenerImpl.java index b930f4a0c..98e5b2399 100644 --- a/sink-connector-lightweight/src/main/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/MySqlDDLParserListenerImpl.java +++ b/sink-connector-lightweight/src/main/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/MySqlDDLParserListenerImpl.java @@ -54,16 +54,8 @@ public MySqlDDLParserListenerImpl(BaseDbWriter writer, StringBuffer transformedQ } catch(Exception e) { log.error("enterCreateDatabase: Error parsing source to destination database map:" + e.toString()); } - // databaseName might contain backticks. Remove them. - if(databaseName.contains("`")) { - databaseName = databaseName.replace("`", ""); - } - if(sourceToDestinationMap.containsKey(databaseName)) { - this.databaseName = sourceToDestinationMap.get(databaseName); - } else { - this.databaseName = databaseName; - } + this.databaseName = overrideDatabaseName(databaseName); this.query = transformedQuery; this.tableName = tableName; @@ -74,6 +66,23 @@ public MySqlDDLParserListenerImpl(BaseDbWriter writer, StringBuffer transformedQ this.userProvidedTimeZone = parseTimeZone(); } + /** + * Function to override the database name. + * @param databaseName + * @return + */ + private String overrideDatabaseName(String databaseName) { + + // databaseName might contain backticks. Remove them. + if(databaseName.contains("`")) { + databaseName = databaseName.replace("`", ""); + } + + if(sourceToDestinationMap.containsKey(databaseName)) { + return sourceToDestinationMap.get(databaseName); + } + return databaseName; + } public ZoneId parseTimeZone() { String userProvidedTimeZone = config.getString(ClickHouseSinkConnectorConfigVariables @@ -102,25 +111,9 @@ public void enterCreateDatabase(MySqlParser.CreateDatabaseContext createDatabase String databaseName = tree.getText(); if(!databaseName.isEmpty()) { - // Check if the database is overridden - Map sourceToDestinationMap = new HashMap<>(); - - try { - if (this.config.getString(ClickHouseSinkConnectorConfigVariables.CLICKHOUSE_DATABASE_OVERRIDE_MAP.toString()) != null) - sourceToDestinationMap = Utils.parseSourceToDestinationDatabaseMap(this.config. - getString(ClickHouseSinkConnectorConfigVariables.CLICKHOUSE_DATABASE_OVERRIDE_MAP.toString())); - } catch(Exception e) { - log.error("enterCreateDatabase: Error parsing source to destination database map:" + e.toString()); - } - // databaseName might contain backticks. Remove them. - if(databaseName.contains("`")) { - databaseName = databaseName.replace("`", ""); - } - if(sourceToDestinationMap.containsKey(databaseName)) { - this.query.append(String.format(Constants.CREATE_DATABASE, sourceToDestinationMap.get(databaseName))); - } else { - this.query.append(String.format(Constants.CREATE_DATABASE, databaseName)); - } + + String overrideDatabaseName = overrideDatabaseName(tree.getText()); + this.query.append(String.format(Constants.CREATE_DATABASE, overrideDatabaseName)); } } } @@ -219,7 +212,10 @@ private Set parseCreateTable(MySqlParser.CreateTableContext ctx, StringB this.tableName = tree.getText(); // If tableName already includes the database name don't include database name in this.query if(tableName.contains(".")) { - this.query.append(tableName); + // split tableName into databaseName and tableName + String[] tableNameSplit = tableName.split("\\."); + this.query.append(this.databaseName).append(".").append(tableNameSplit[1]); + //this.query.append(tableName); } else this.query.append(databaseName).append(".").append(tree.getText()); @@ -366,8 +362,7 @@ private String getClickHouseDataType(String parsedDataType, ParseTree colDefTree MySqlParser.DataTypeContext dtc = ((MySqlParser.ColumnDefinitionContext) colDefTree).dataType(); DataType dt = DataTypeConverter.getDataType(dtc); - if(dt.name().equalsIgnoreCase("ENUM")) - { + if(dt.name().equalsIgnoreCase("ENUM") || dt.name().equalsIgnoreCase("SET")) { // Dont try to get precision/scale for enums } else if(parsedDataType.contains("(") && parsedDataType.contains(")") && parsedDataType.contains(",") ) { @@ -736,8 +731,12 @@ public void enterRenameTable(MySqlParser.RenameTableContext renameTableContext) originalTableName = renameTableContextChildren.get(0).getText(); newTableName = renameTableContextChildren.get(2).getText(); // If the table name already includes the database name dont include it in the query. - if(originalTableName.contains(".")) { - this.query.append(originalTableName).append(" to ").append(newTableName); + if(originalTableName.contains(".") && newTableName.contains(".")) { + // Split database and table name. + String[] databaseAndTableNameArray = originalTableName.split("\\."); + String[] newDatabaseAndTableNameArray = newTableName.split("\\."); + this.query.append(this.databaseName).append(".").append(databaseAndTableNameArray[1]).append(" to "). + append(this.databaseName).append(".").append(newDatabaseAndTableNameArray[1]); } else this.query.append(databaseName).append(".").append(originalTableName).append(" to "). append(databaseName).append(".").append(newTableName); diff --git a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ClickHouseDebeziumEmbeddedPostgresDecoderBufsDockerIT.java b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ClickHouseDebeziumEmbeddedPostgresDecoderBufsDockerIT.java index b6a694a7f..a9f044582 100644 --- a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ClickHouseDebeziumEmbeddedPostgresDecoderBufsDockerIT.java +++ b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ClickHouseDebeziumEmbeddedPostgresDecoderBufsDockerIT.java @@ -81,9 +81,7 @@ public void testDecoderBufsPlugin() throws Exception { try { engine.set(new DebeziumChangeEventCapture()); - engine.get().setup(getProperties(), new SourceRecordParserService(), - new MySQLDDLParserService(new ClickHouseSinkConnectorConfig(new HashMap<>()), - "employees"), false); + engine.get().setup(getProperties(), new SourceRecordParserService(), false); } catch (Exception e) { throw new RuntimeException(e); } diff --git a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ClickHouseDebeziumEmbeddedPostgresPgoutputDockerIT.java b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ClickHouseDebeziumEmbeddedPostgresPgoutputDockerIT.java index d09e6da0c..5df4eebec 100644 --- a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ClickHouseDebeziumEmbeddedPostgresPgoutputDockerIT.java +++ b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ClickHouseDebeziumEmbeddedPostgresPgoutputDockerIT.java @@ -84,8 +84,7 @@ public void testPgOutputPlugin() throws Exception { try { engine.set(new DebeziumChangeEventCapture()); - engine.get().setup(getProperties(), new SourceRecordParserService(), - new MySQLDDLParserService(new ClickHouseSinkConnectorConfig(new HashMap<>()), "system"), false); + engine.get().setup(getProperties(), new SourceRecordParserService(), false); } catch (Exception e) { throw new RuntimeException(e); } diff --git a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ITCommon.java b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ITCommon.java index c24ac4241..dcc5b5063 100644 --- a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ITCommon.java +++ b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ITCommon.java @@ -116,6 +116,7 @@ static public Properties getDebeziumProperties(MySQLContainer mySqlContainer, Cl defaultProps.setProperty("clickhouse.server.port", String.valueOf(clickHouseContainer.getFirstMappedPort())); defaultProps.setProperty("clickhouse.server.user", clickHouseContainer.getUsername()); defaultProps.setProperty("clickhouse.server.password", clickHouseContainer.getPassword()); + //defaultProps.setProperty("ddl.retry", "true"); defaultProps.setProperty("offset.storage.jdbc.url", String.format("jdbc:clickhouse://%s:%s", clickHouseContainer.getHost(), clickHouseContainer.getFirstMappedPort())); diff --git a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/MariaDBIT.java b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/MariaDBIT.java index 6a575ab3f..97f1b9148 100644 --- a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/MariaDBIT.java +++ b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/MariaDBIT.java @@ -84,8 +84,7 @@ public void testMultipleDatabases() throws Exception { try { engine.set(new DebeziumChangeEventCapture()); - engine.get().setup(props, new SourceRecordParserService(), - new MySQLDDLParserService(new ClickHouseSinkConnectorConfig(new HashMap<>()), "test_db"),false); + engine.get().setup(props, new SourceRecordParserService(),false); } catch (Exception e) { throw new RuntimeException(e); } diff --git a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/MySQLGenerateColumnsTest.java b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/MySQLGenerateColumnsTest.java index cb7348279..6444b11a9 100644 --- a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/MySQLGenerateColumnsTest.java +++ b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/MySQLGenerateColumnsTest.java @@ -75,9 +75,7 @@ public void testMySQLGeneratedColumns() throws Exception { Properties props = getDebeziumProperties(mySqlContainer, clickHouseContainer); engine.set(new DebeziumChangeEventCapture()); - engine.get().setup(props, new SourceRecordParserService(), - new MySQLDDLParserService(new ClickHouseSinkConnectorConfig(new HashMap<>()), - "employees"), false); + engine.get().setup(props, new SourceRecordParserService(), false); } catch (Exception e) { throw new RuntimeException(e); } diff --git a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/MySQLJsonIT.java b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/MySQLJsonIT.java index ccf9409ea..6712ae23f 100644 --- a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/MySQLJsonIT.java +++ b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/MySQLJsonIT.java @@ -78,8 +78,7 @@ public void testMultipleDatabases() throws Exception { try { engine.set(new DebeziumChangeEventCapture()); - engine.get().setup(props, new SourceRecordParserService(), - new MySQLDDLParserService(new ClickHouseSinkConnectorConfig(new HashMap<>()), "test_db"),false); + engine.get().setup(props, new SourceRecordParserService(), false); } catch (Exception e) { throw new RuntimeException(e); } diff --git a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/OffsetManagementIT.java b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/OffsetManagementIT.java index 0c89e3020..c8419da2d 100644 --- a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/OffsetManagementIT.java +++ b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/OffsetManagementIT.java @@ -93,8 +93,7 @@ public void testAutoCreateTable(String clickHouseServerVersion) throws Exception try { engine.set(new DebeziumChangeEventCapture()); - engine.get().setup(ITCommon.getDebeziumPropertiesForSchemaOnly(mySqlContainer, clickHouseContainer), new SourceRecordParserService(), - new MySQLDDLParserService(new ClickHouseSinkConnectorConfig(new HashMap<>()), "datatypes"),false); + engine.get().setup(ITCommon.getDebeziumPropertiesForSchemaOnly(mySqlContainer, clickHouseContainer), new SourceRecordParserService(), false); } catch (Exception e) { throw new RuntimeException(e); } diff --git a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/PostgresInitialDockerIT.java b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/PostgresInitialDockerIT.java index 4189bb5a5..96ac528fc 100644 --- a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/PostgresInitialDockerIT.java +++ b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/PostgresInitialDockerIT.java @@ -82,8 +82,7 @@ public void testDecoderBufsPlugin() throws Exception { try { engine.set(new DebeziumChangeEventCapture()); - engine.get().setup(getProperties(), new SourceRecordParserService(), - new MySQLDDLParserService(new ClickHouseSinkConnectorConfig(new HashMap<>()), "employees"), false); + engine.get().setup(getProperties(), new SourceRecordParserService(), false); } catch (Exception e) { throw new RuntimeException(e); } diff --git a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/PostgresInitialDockerWKeeperMapStorageIT.java b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/PostgresInitialDockerWKeeperMapStorageIT.java index 5a419e681..45ce26625 100644 --- a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/PostgresInitialDockerWKeeperMapStorageIT.java +++ b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/PostgresInitialDockerWKeeperMapStorageIT.java @@ -105,8 +105,7 @@ public void testDecoderBufsPlugin() throws Exception { try { engine.set(new DebeziumChangeEventCapture()); - engine.get().setup(getProperties(), new SourceRecordParserService(), - new MySQLDDLParserService(new ClickHouseSinkConnectorConfig(new HashMap<>()), "employees"), false); + engine.get().setup(getProperties(), new SourceRecordParserService(), false); } catch (Exception e) { throw new RuntimeException(e); } diff --git a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/PostgresPgoutputMultipleSchemaIT.java b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/PostgresPgoutputMultipleSchemaIT.java index 3fda8fbb8..2ea19ea19 100644 --- a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/PostgresPgoutputMultipleSchemaIT.java +++ b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/PostgresPgoutputMultipleSchemaIT.java @@ -87,8 +87,7 @@ public void testMultipleSchemaReplication() throws Exception { try { engine.set(new DebeziumChangeEventCapture()); - engine.get().setup(getProperties(), new SourceRecordParserService(), - new MySQLDDLParserService(new ClickHouseSinkConnectorConfig(new HashMap<>()), "system"), false); + engine.get().setup(getProperties(), new SourceRecordParserService(), false); } catch (Exception e) { throw new RuntimeException(e); } diff --git a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ReplicatedRMTClickHouse22TIT.java b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ReplicatedRMTClickHouse22TIT.java index 08ea89ba8..70fae4413 100644 --- a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ReplicatedRMTClickHouse22TIT.java +++ b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ReplicatedRMTClickHouse22TIT.java @@ -91,8 +91,7 @@ public void testReplicatedRMTAutoCreate(String clickHouseServerVersion) throws E try { engine.set(new DebeziumChangeEventCapture()); - engine.get().setup(props, new SourceRecordParserService(), - new MySQLDDLParserService(new ClickHouseSinkConnectorConfig(new HashMap<>()), "employees"), false); + engine.get().setup(props, new SourceRecordParserService(), false); } catch (Exception e) { throw new RuntimeException(e); } diff --git a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ReplicatedRMTDDLClickHouse22TIT.java b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ReplicatedRMTDDLClickHouse22TIT.java index 21e839d84..2ec6a53b1 100644 --- a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ReplicatedRMTDDLClickHouse22TIT.java +++ b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ReplicatedRMTDDLClickHouse22TIT.java @@ -93,8 +93,7 @@ public void testReplicatedRMTAutoCreate(String clickHouseServerVersion) throws E try { engine.set(new DebeziumChangeEventCapture()); - engine.get().setup(props, new SourceRecordParserService(), - new MySQLDDLParserService(new ClickHouseSinkConnectorConfig(new HashMap<>()), "employees"), false); + engine.get().setup(props, new SourceRecordParserService(), false); } catch (Exception e) { throw new RuntimeException(e); } diff --git a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ReplicatedRMTDDLIT.java b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ReplicatedRMTDDLIT.java index fc5278dd8..f0fe12e1b 100644 --- a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ReplicatedRMTDDLIT.java +++ b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ReplicatedRMTDDLIT.java @@ -96,8 +96,7 @@ public void testReplicatedRMTAutoCreate(String clickHouseServerVersion) throws E try { engine.set(new DebeziumChangeEventCapture()); - engine.get().setup(props, new SourceRecordParserService(), - new MySQLDDLParserService(new ClickHouseSinkConnectorConfig(new HashMap<>()), "employees"), false); + engine.get().setup(props, new SourceRecordParserService(), false); } catch (Exception e) { throw new RuntimeException(e); } diff --git a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ReplicatedRMTIT.java b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ReplicatedRMTIT.java index 65a385561..6cb2c1e9c 100644 --- a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ReplicatedRMTIT.java +++ b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ReplicatedRMTIT.java @@ -95,9 +95,7 @@ public void testReplicatedRMTAutoCreate(String clickHouseServerVersion) throws E try { engine.set(new DebeziumChangeEventCapture()); - engine.get().setup(props, new SourceRecordParserService(), - new MySQLDDLParserService(new ClickHouseSinkConnectorConfig(new HashMap<>()), - "employees"), false); + engine.get().setup(props, new SourceRecordParserService(), false); } catch (Exception e) { throw new RuntimeException(e); } diff --git a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/api/DebeziumEmbeddedRestApiIT.java b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/api/DebeziumEmbeddedRestApiIT.java index b829176b6..7a1e76ab1 100644 --- a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/api/DebeziumEmbeddedRestApiIT.java +++ b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/api/DebeziumEmbeddedRestApiIT.java @@ -100,8 +100,8 @@ public void testRESTAPI(String clickHouseServerVersion) throws Exception { try { engine.set(new DebeziumChangeEventCapture()); - engine.get().setup(ITCommon.getDebeziumPropertiesForSchemaOnly(mySqlContainer, clickHouseContainer), new SourceRecordParserService(), - new MySQLDDLParserService(new ClickHouseSinkConnectorConfig(new HashMap<>()), "datatypes"),false); + engine.get().setup(ITCommon.getDebeziumPropertiesForSchemaOnly(mySqlContainer, clickHouseContainer), new SourceRecordParserService() + ,false); } catch (Exception e) { throw new RuntimeException(e); } diff --git a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/cdc/BatchRetryOnFailureIT.java b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/cdc/BatchRetryOnFailureIT.java index 0635be944..9e729d3ce 100644 --- a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/cdc/BatchRetryOnFailureIT.java +++ b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/cdc/BatchRetryOnFailureIT.java @@ -75,8 +75,7 @@ public void testBatchRetryOnCHFailure() throws Exception { ExecutorService executorService = Executors.newFixedThreadPool(1); executorService.execute(() -> { try { - clickHouseDebeziumEmbeddedApplication.start(injector.getInstance(DebeziumRecordParserService.class), - injector.getInstance(DDLParserService.class), props, false); + clickHouseDebeziumEmbeddedApplication.start(injector.getInstance(DebeziumRecordParserService.class) , props, false); DebeziumEmbeddedRestApi.startRestApi(props, injector, clickHouseDebeziumEmbeddedApplication.getDebeziumEventCapture() , new Properties()); } catch (Exception e) { diff --git a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/cdc/ClickHouseDelayedStartIT.java b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/cdc/ClickHouseDelayedStartIT.java index bdc18d15d..d0d1e7105 100644 --- a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/cdc/ClickHouseDelayedStartIT.java +++ b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/cdc/ClickHouseDelayedStartIT.java @@ -76,8 +76,7 @@ public void testClickHouseDelayedStart() throws Exception { ExecutorService executorService = Executors.newFixedThreadPool(1); executorService.execute(() -> { try { - clickHouseDebeziumEmbeddedApplication.start(injector.getInstance(DebeziumRecordParserService.class), - injector.getInstance(DDLParserService.class), props, false); + clickHouseDebeziumEmbeddedApplication.start(injector.getInstance(DebeziumRecordParserService.class), props, false); DebeziumEmbeddedRestApi.startRestApi(props, injector, clickHouseDebeziumEmbeddedApplication.getDebeziumEventCapture() , new Properties()); } catch (Exception e) { @@ -136,8 +135,7 @@ public void debeziumStorageView() throws Exception { ExecutorService executorService = Executors.newFixedThreadPool(1); executorService.execute(() -> { try { - clickHouseDebeziumEmbeddedApplication.start(injector.getInstance(DebeziumRecordParserService.class), - injector.getInstance(DDLParserService.class), props, false); + clickHouseDebeziumEmbeddedApplication.start(injector.getInstance(DebeziumRecordParserService.class), props, false); DebeziumEmbeddedRestApi.startRestApi(props, injector, clickHouseDebeziumEmbeddedApplication.getDebeziumEventCapture() , new Properties()); } catch (Exception e) { diff --git a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/cdc/DatabaseOverrideIT.java b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/cdc/DatabaseOverrideIT.java index c711c9ff0..5ab95eb1b 100644 --- a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/cdc/DatabaseOverrideIT.java +++ b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/cdc/DatabaseOverrideIT.java @@ -86,8 +86,7 @@ public void testDatabaseOverride() throws Exception { ExecutorService executorService = Executors.newFixedThreadPool(1); executorService.execute(() -> { try { - clickHouseDebeziumEmbeddedApplication.start(injector.getInstance(DebeziumRecordParserService.class), - injector.getInstance(DDLParserService.class), props, false); + clickHouseDebeziumEmbeddedApplication.start(injector.getInstance(DebeziumRecordParserService.class), props, false); DebeziumEmbeddedRestApi.startRestApi(props, injector, clickHouseDebeziumEmbeddedApplication.getDebeziumEventCapture() , new Properties()); } catch (Exception e) { @@ -114,6 +113,7 @@ public void testDatabaseOverride() throws Exception { conn.prepareStatement("create table customers.custtable(col1 varchar(255) not null, col2 int, col3 int, primary key(col1))").execute(); conn.prepareStatement("insert into customers.custtable values('a', 1, 1)").execute(); + Thread.sleep(10000); // Validate in Clickhouse the last record written is 29999 @@ -148,6 +148,9 @@ public void testDatabaseOverride() throws Exception { assertTrue(customersCol2 == 1); + Thread.sleep(10000); + // Execute the query in MySQL to rename table. + clickHouseDebeziumEmbeddedApplication.getDebeziumEventCapture().engine.close(); diff --git a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/cdc/DatabaseOverrideInitialIT.java b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/cdc/DatabaseOverrideInitialIT.java index 5a615e623..465268279 100644 --- a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/cdc/DatabaseOverrideInitialIT.java +++ b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/cdc/DatabaseOverrideInitialIT.java @@ -86,8 +86,7 @@ public void testDatabaseOverride() throws Exception { ExecutorService executorService = Executors.newFixedThreadPool(1); executorService.execute(() -> { try { - clickHouseDebeziumEmbeddedApplication.start(injector.getInstance(DebeziumRecordParserService.class), - injector.getInstance(DDLParserService.class), props, false); + clickHouseDebeziumEmbeddedApplication.start(injector.getInstance(DebeziumRecordParserService.class), props, false); DebeziumEmbeddedRestApi.startRestApi(props, injector, clickHouseDebeziumEmbeddedApplication.getDebeziumEventCapture() , new Properties()); } catch (Exception e) { diff --git a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/cdc/DatabaseOverrideRRMTIT.java b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/cdc/DatabaseOverrideRRMTIT.java new file mode 100644 index 000000000..182429f34 --- /dev/null +++ b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/cdc/DatabaseOverrideRRMTIT.java @@ -0,0 +1,215 @@ +package com.altinity.clickhouse.debezium.embedded.cdc; + +import com.altinity.clickhouse.debezium.embedded.AppInjector; +import com.altinity.clickhouse.debezium.embedded.ClickHouseDebeziumEmbeddedApplication; +import com.altinity.clickhouse.debezium.embedded.ITCommon; +import com.altinity.clickhouse.debezium.embedded.api.DebeziumEmbeddedRestApi; +import com.altinity.clickhouse.debezium.embedded.parser.DebeziumRecordParserService; +import com.altinity.clickhouse.sink.connector.ClickHouseSinkConnectorConfig; +import com.altinity.clickhouse.sink.connector.ClickHouseSinkConnectorConfigVariables; +import com.altinity.clickhouse.sink.connector.db.BaseDbWriter; +import com.clickhouse.jdbc.ClickHouseConnection; +import com.google.inject.Guice; +import com.google.inject.Injector; +import org.apache.log4j.BasicConfigurator; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testcontainers.clickhouse.ClickHouseContainer; +import org.testcontainers.containers.BindMode; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.MySQLContainer; +import org.testcontainers.containers.Network; +import org.testcontainers.containers.wait.strategy.HttpWaitStrategy; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.utility.DockerImageName; + +import java.sql.Connection; +import java.sql.ResultSet; +import java.util.HashMap; +import java.util.Properties; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +import static com.altinity.clickhouse.debezium.embedded.ITCommon.getDebeziumProperties; +import static org.junit.Assert.assertTrue; + +public class DatabaseOverrideRRMTIT { + + private static final Logger log = LoggerFactory.getLogger(DatabaseOverrideRRMTIT.class); + + + protected MySQLContainer mySqlContainer; + static ClickHouseContainer clickHouseContainer; + + static GenericContainer zookeeperContainer = new GenericContainer(DockerImageName.parse("zookeeper:3.6.2")) + .withExposedPorts(2181).withAccessToHost(true); + + @BeforeEach + public void startContainers() throws InterruptedException { + + Network network = Network.newNetwork(); + zookeeperContainer.withNetwork(network).withNetworkAliases("zookeeper"); + zookeeperContainer.start(); + mySqlContainer = new MySQLContainer<>(DockerImageName.parse("docker.io/bitnami/mysql:8.0.36") + .asCompatibleSubstituteFor("mysql")) + .withDatabaseName("employees").withUsername("root").withPassword("adminpass") +// .withInitScript("15k_tables_mysql.sql") + .withExtraHost("mysql-server", "0.0.0.0") + .waitingFor(new HttpWaitStrategy().forPort(3306)); + + clickHouseContainer = new ClickHouseContainer(DockerImageName.parse("clickhouse/clickhouse-server:latest") + .asCompatibleSubstituteFor("clickhouse")) + .withInitScript("init_clickhouse_schema_only_column_timezone.sql") + // .withCopyFileToContainer(MountableFile.forClasspathResource("config.xml"), "/etc/clickhouse-server/config.d/config.xml") + .withUsername("ch_user") + .withPassword("password") + .withClasspathResourceMapping("config_replicated.xml", "/etc/clickhouse-server/config.d/config.xml", BindMode.READ_ONLY) + .withClasspathResourceMapping("macros.xml", "/etc/clickhouse-server/config.d/macros.xml", BindMode.READ_ONLY) + .withExposedPorts(8123) + .waitingFor(new HttpWaitStrategy().forPort(zookeeperContainer.getFirstMappedPort())); + clickHouseContainer.withNetwork(network).withNetworkAliases("clickhouse"); + clickHouseContainer.start(); + + BasicConfigurator.configure(); + mySqlContainer.start(); + clickHouseContainer.start(); + Thread.sleep(35000); + } + + + @DisplayName("Test that validates overriding database name in ClickHouse for ReplicatedReplacingMergeTree(RRMT)") + @Test + public void testDatabaseOverride() throws Exception { + + String jdbcUrl = BaseDbWriter.getConnectionString(clickHouseContainer.getHost(), clickHouseContainer.getFirstMappedPort(), + "system"); + ClickHouseConnection chConn = BaseDbWriter.createConnection(jdbcUrl, "Client_1", + clickHouseContainer.getUsername(), clickHouseContainer.getPassword(), new ClickHouseSinkConnectorConfig(new HashMap<>())); + BaseDbWriter writer = new BaseDbWriter(clickHouseContainer.getHost(), clickHouseContainer.getFirstMappedPort(), + "employees", clickHouseContainer.getUsername(), clickHouseContainer.getPassword(), null, chConn); + + writer.executeQuery("CREATE DATABASE employees2"); + writer.executeQuery("CREATE DATABASE productsnew"); + + Thread.sleep(10000); + Injector injector = Guice.createInjector(new AppInjector()); + + Properties props = getDebeziumProperties(mySqlContainer, clickHouseContainer); + props.setProperty("snapshot.mode", "schema_only"); + props.setProperty("schema.history.internal.store.only.captured.tables.ddl", "true"); + props.setProperty("schema.history.internal.store.only.captured.databases.ddl", "true"); + props.setProperty("clickhouse.database.override.map", "employees:employees2, products:productsnew"); + props.setProperty("database.include.list", "employees, products, customers"); + props.setProperty(ClickHouseSinkConnectorConfigVariables.AUTO_CREATE_TABLES_REPLICATED.toString(), "true"); + props.setProperty(ClickHouseSinkConnectorConfigVariables.AUTO_CREATE_TABLES.toString(), "true"); + props.setProperty("ddl.retry", "true"); + // Override clickhouse server timezone. + ClickHouseDebeziumEmbeddedApplication clickHouseDebeziumEmbeddedApplication = new ClickHouseDebeziumEmbeddedApplication(); + + + ExecutorService executorService = Executors.newFixedThreadPool(1); + executorService.execute(() -> { + try { + clickHouseDebeziumEmbeddedApplication.start(injector.getInstance(DebeziumRecordParserService.class), props, false); + DebeziumEmbeddedRestApi.startRestApi(props, injector, clickHouseDebeziumEmbeddedApplication.getDebeziumEventCapture() + , new Properties()); + } catch (Exception e) { + throw new RuntimeException(e); + } + + }); + + Thread.sleep(25000); + + // Employees table + Connection conn = ITCommon.connectToMySQL(mySqlContainer); + conn.prepareStatement("create table `newtable`(col1 varchar(255) not null, col2 int, col3 int, primary key(col1))").execute(); + + // Insert a new row in the table + conn.prepareStatement("insert into newtable values('a', 1, 1)").execute(); + + + conn.prepareStatement("create database products").execute(); + conn.prepareStatement("create table products.prodtable(col1 varchar(255) not null, col2 int, col3 int, primary key(col1))").execute(); + conn.prepareStatement("insert into products.prodtable values('a', 1, 1)").execute(); + + conn.prepareStatement("create database customers").execute(); + conn.prepareStatement("create table customers.custtable(col1 varchar(255) not null, col2 int, col3 int, primary key(col1))").execute(); + conn.prepareStatement("insert into customers.custtable values('a', 1, 1)").execute(); + + + Thread.sleep(10000); + + // Validate in Clickhouse the last record written is 29999 + + + long col2 = 0L; + ResultSet version1Result = writer.executeQueryWithResultSet("select col2 from employees2.newtable final where col1 = 'a'"); + while(version1Result.next()) { + col2 = version1Result.getLong("col2"); + } + Thread.sleep(10000); + assertTrue(col2 == 1); + + long productsCol2 = 0L; + ResultSet productsVersionResult = writer.executeQueryWithResultSet("select col2 from productsnew.prodtable final where col1 = 'a'"); + while(productsVersionResult.next()) { + productsCol2 = productsVersionResult.getLong("col2"); + } + assertTrue(productsCol2 == 1); + Thread.sleep(10000); + + long customersCol2 = 0L; + ResultSet customersVersionResult = writer.executeQueryWithResultSet("select col2 from customers.custtable final where col1 = 'a'"); + while(customersVersionResult.next()) { + customersCol2 = customersVersionResult.getLong("col2"); + } + assertTrue(customersCol2 == 1); + + + Thread.sleep(10000); + // Execute the query in MySQL to rename table. + conn.prepareStatement("use products").execute(); + conn.prepareStatement("rename table prodtable to prodtable2").execute(); + Thread.sleep(10000); +// ResultSet customersVersionResult2 = writer.executeQueryWithResultSet("select col2 from customers.custtable2 final where col1 = 'a'"); +// while(customersVersionResult2.next()) { +// customersCol2 = customersVersionResult2.getLong("col2"); +// } +// assertTrue(customersCol2 == 2); + + // validate that the table prodtaable2 is present in clickhouse + ResultSet chRs = writer.executeQueryWithResultSet("select * from productsnew.prodtable2"); + boolean recordFound = false; + while(chRs.next()) { + recordFound = true; + assert chRs.getString("col1").equalsIgnoreCase("a"); + //assert rs.getString("name").equalsIgnoreCase("test"); + } + + assertTrue(recordFound); + + + // Execute mysql to rename from prodtabl2 to prodtable3 without database prefix. + conn.prepareStatement("rename table prodtable2 to prodtable3").execute(); + + Thread.sleep(10000); + // Validate on CH that the table prodtable3 is present. + chRs = writer.executeQueryWithResultSet("select * from productsnew.prodtable3"); + boolean prod3RecordFound = false; + while(chRs.next()) { + prod3RecordFound = true; + assert chRs.getString("col1").equalsIgnoreCase("a"); + //assert rs.getString("name").equalsIgnoreCase("test"); + } + assertTrue(prod3RecordFound); + clickHouseDebeziumEmbeddedApplication.getDebeziumEventCapture().engine.close(); + + conn.close(); + // Files.deleteIfExists( tmpFilePath); + executorService.shutdown(); + } +} diff --git a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/cdc/Debezium15KTablesLoadIT.java b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/cdc/Debezium15KTablesLoadIT.java index 6e5948729..0fffe2464 100644 --- a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/cdc/Debezium15KTablesLoadIT.java +++ b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/cdc/Debezium15KTablesLoadIT.java @@ -78,8 +78,7 @@ public void testLoadingTablesInSchemaOnlyMode() throws Exception { ExecutorService executorService = Executors.newFixedThreadPool(1); executorService.execute(() -> { try { - clickHouseDebeziumEmbeddedApplication.start(injector.getInstance(DebeziumRecordParserService.class), - injector.getInstance(DDLParserService.class), props, false); + clickHouseDebeziumEmbeddedApplication.start(injector.getInstance(DebeziumRecordParserService.class), props, false); DebeziumEmbeddedRestApi.startRestApi(props, injector, clickHouseDebeziumEmbeddedApplication.getDebeziumEventCapture() , new Properties()); } catch (Exception e) { diff --git a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/cdc/DebeziumChangeEventCaptureIT.java b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/cdc/DebeziumChangeEventCaptureIT.java index d49c277cc..f3c8f2853 100644 --- a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/cdc/DebeziumChangeEventCaptureIT.java +++ b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/cdc/DebeziumChangeEventCaptureIT.java @@ -123,8 +123,7 @@ public void testIncrementingSequenceNumbers() throws Exception { ExecutorService executorService = Executors.newFixedThreadPool(1); executorService.execute(() -> { try { - clickHouseDebeziumEmbeddedApplication.start(injector.getInstance(DebeziumRecordParserService.class), - injector.getInstance(DDLParserService.class), props, false); + clickHouseDebeziumEmbeddedApplication.start(injector.getInstance(DebeziumRecordParserService.class) , props, false); DebeziumEmbeddedRestApi.startRestApi(props, injector, clickHouseDebeziumEmbeddedApplication.getDebeziumEventCapture() , new Properties()); } catch (Exception e) { diff --git a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/cdc/DebeziumStorageViewIT.java b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/cdc/DebeziumStorageViewIT.java index 368aec496..dc5066fb5 100644 --- a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/cdc/DebeziumStorageViewIT.java +++ b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/cdc/DebeziumStorageViewIT.java @@ -73,8 +73,7 @@ public void debeziumStorageView() throws Exception { ExecutorService executorService = Executors.newFixedThreadPool(1); executorService.execute(() -> { try { - clickHouseDebeziumEmbeddedApplication.start(injector.getInstance(DebeziumRecordParserService.class), - injector.getInstance(DDLParserService.class), props, false); + clickHouseDebeziumEmbeddedApplication.start(injector.getInstance(DebeziumRecordParserService.class), props, false); DebeziumEmbeddedRestApi.startRestApi(props, injector, clickHouseDebeziumEmbeddedApplication.getDebeziumEventCapture() , new Properties()); } catch (Exception e) { diff --git a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/cdc/DestinationDBColumnMissingIT.java b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/cdc/DestinationDBColumnMissingIT.java index 3f4c8d73e..87d347f42 100644 --- a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/cdc/DestinationDBColumnMissingIT.java +++ b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/cdc/DestinationDBColumnMissingIT.java @@ -85,8 +85,7 @@ public void testColumnMismatch() throws Exception { ExecutorService executorService = Executors.newFixedThreadPool(1); executorService.execute(() -> { try { - clickHouseDebeziumEmbeddedApplication.start(injector.getInstance(DebeziumRecordParserService.class), - injector.getInstance(DDLParserService.class), props, false); + clickHouseDebeziumEmbeddedApplication.start(injector.getInstance(DebeziumRecordParserService.class) , props, false); DebeziumEmbeddedRestApi.startRestApi(props, injector, clickHouseDebeziumEmbeddedApplication.getDebeziumEventCapture() , new Properties()); } catch (Exception e) { diff --git a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/cdc/MultipleUpdatesWSameTimestampIT.java b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/cdc/MultipleUpdatesWSameTimestampIT.java index 14b8f3170..f2a47165e 100644 --- a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/cdc/MultipleUpdatesWSameTimestampIT.java +++ b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/cdc/MultipleUpdatesWSameTimestampIT.java @@ -92,8 +92,7 @@ public void testIncrementingSequenceNumberWithUpdates() throws Exception { ExecutorService executorService = Executors.newFixedThreadPool(1); executorService.execute(() -> { try { - clickHouseDebeziumEmbeddedApplication.start(injector.getInstance(DebeziumRecordParserService.class), - injector.getInstance(DDLParserService.class), props, false); + clickHouseDebeziumEmbeddedApplication.start(injector.getInstance(DebeziumRecordParserService.class) , props, false); DebeziumEmbeddedRestApi.startRestApi(props, injector, clickHouseDebeziumEmbeddedApplication.getDebeziumEventCapture() , new Properties()); } catch (Exception e) { diff --git a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/cdc/SourceDBColumnMissingIT.java b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/cdc/SourceDBColumnMissingIT.java index 6ef0bd20a..98234cd8a 100644 --- a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/cdc/SourceDBColumnMissingIT.java +++ b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/cdc/SourceDBColumnMissingIT.java @@ -84,7 +84,7 @@ public void testColumnMismatch() throws Exception { executorService.execute(() -> { try { clickHouseDebeziumEmbeddedApplication.start(injector.getInstance(DebeziumRecordParserService.class), - injector.getInstance(DDLParserService.class), props, false); + props, false); DebeziumEmbeddedRestApi.startRestApi(props, injector, clickHouseDebeziumEmbeddedApplication.getDebeziumEventCapture() , new Properties()); } catch (Exception e) { diff --git a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/client/SinkConnectorClientRestAPITest.java b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/client/SinkConnectorClientRestAPITest.java index a82932f36..207340903 100644 --- a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/client/SinkConnectorClientRestAPITest.java +++ b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/client/SinkConnectorClientRestAPITest.java @@ -81,8 +81,7 @@ public void testRestClient() throws Exception { ExecutorService executorService = Executors.newFixedThreadPool(1); executorService.execute(() -> { try { - clickHouseDebeziumEmbeddedApplication.start(injector.getInstance(DebeziumRecordParserService.class), - injector.getInstance(DDLParserService.class), props, false); + clickHouseDebeziumEmbeddedApplication.start(injector.getInstance(DebeziumRecordParserService.class), props, false); DebeziumEmbeddedRestApi.startRestApi(props, injector, clickHouseDebeziumEmbeddedApplication.getDebeziumEventCapture() , new Properties()); } catch (Exception e) { diff --git a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/AlterTableAddColumnIT.java b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/AlterTableAddColumnIT.java index f038d8971..66d77a177 100644 --- a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/AlterTableAddColumnIT.java +++ b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/AlterTableAddColumnIT.java @@ -1,8 +1,10 @@ package com.altinity.clickhouse.debezium.embedded.ddl.parser; import com.altinity.clickhouse.debezium.embedded.cdc.DebeziumChangeEventCapture; +import com.altinity.clickhouse.debezium.embedded.config.SinkConnectorLightWeightConfig; import com.altinity.clickhouse.debezium.embedded.parser.SourceRecordParserService; import com.altinity.clickhouse.sink.connector.ClickHouseSinkConnectorConfig; +import com.altinity.clickhouse.sink.connector.ClickHouseSinkConnectorConfigVariables; import com.altinity.clickhouse.sink.connector.db.BaseDbWriter; import com.clickhouse.jdbc.ClickHouseConnection; import org.apache.log4j.BasicConfigurator; @@ -18,6 +20,7 @@ import java.sql.Connection; import java.util.HashMap; import java.util.Map; +import java.util.Properties; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicReference; @@ -50,10 +53,12 @@ public void testAddColumn() throws Exception { executorService.execute(() -> { try { + Properties properties = getDebeziumProperties(); + // Add ddl.retry to true + //properties.put(SinkConnectorLightWeightConfig.DDL_RETRY, "true"); + engine.set(new DebeziumChangeEventCapture()); - engine.get().setup(getDebeziumProperties(), new SourceRecordParserService(), - new MySQLDDLParserService(new ClickHouseSinkConnectorConfig(new HashMap<>()), - "employees"), false); + engine.get().setup(properties, new SourceRecordParserService(), false); } catch (Exception e) { throw new RuntimeException(e); } diff --git a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/AlterTableChangeColumnIT.java b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/AlterTableChangeColumnIT.java index e7ccf74f9..33bbf1740 100644 --- a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/AlterTableChangeColumnIT.java +++ b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/AlterTableChangeColumnIT.java @@ -48,8 +48,7 @@ public void testChangeColumn() throws Exception { executorService.execute(() -> { try { engine.set(new DebeziumChangeEventCapture()); - engine.get().setup(getDebeziumProperties(), new SourceRecordParserService(), - new MySQLDDLParserService(new ClickHouseSinkConnectorConfig(new HashMap<>()), "employees"), false); + engine.get().setup(getDebeziumProperties(), new SourceRecordParserService(), false); } catch (Exception e) { throw new RuntimeException(e); } diff --git a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/AlterTableModifyColumnIT.java b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/AlterTableModifyColumnIT.java index 8ac671fcd..746139f18 100644 --- a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/AlterTableModifyColumnIT.java +++ b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/AlterTableModifyColumnIT.java @@ -48,9 +48,7 @@ public void testModifyColumn() throws Exception { executorService.execute(() -> { try { engine.set(new DebeziumChangeEventCapture()); - engine.get().setup(getDebeziumProperties(), new SourceRecordParserService(), - new MySQLDDLParserService(new ClickHouseSinkConnectorConfig(new HashMap<>()), - "employees"), false); + engine.get().setup(getDebeziumProperties(), new SourceRecordParserService(), false); } catch (Exception e) { throw new RuntimeException(e); } diff --git a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/AutoCreateTableIT.java b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/AutoCreateTableIT.java index 898f1bb47..149930ee9 100644 --- a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/AutoCreateTableIT.java +++ b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/AutoCreateTableIT.java @@ -78,9 +78,7 @@ public void testAutoCreateTable(String clickHouseServerVersion) throws Exception try { engine.set(new DebeziumChangeEventCapture()); - engine.get().setup(ITCommon.getDebeziumProperties(mySqlContainer, clickHouseContainer), new SourceRecordParserService(), - new MySQLDDLParserService(new ClickHouseSinkConnectorConfig(new HashMap<>()), - "employees"),false); + engine.get().setup(ITCommon.getDebeziumProperties(mySqlContainer, clickHouseContainer), new SourceRecordParserService(), false); } catch (Exception e) { throw new RuntimeException(e); } diff --git a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/CreateTableDataTypesIT.java b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/CreateTableDataTypesIT.java index 663e181b6..c739ff918 100644 --- a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/CreateTableDataTypesIT.java +++ b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/CreateTableDataTypesIT.java @@ -57,9 +57,7 @@ public void testCreateTable() throws Exception { props.setProperty("database.include.list", "datatypes"); engine.set(new DebeziumChangeEventCapture()); - engine.get().setup(getDebeziumProperties(), new SourceRecordParserService(), - new MySQLDDLParserService(new ClickHouseSinkConnectorConfig(new HashMap<>()), - "employees"), false); + engine.get().setup(getDebeziumProperties(), new SourceRecordParserService() , false); } catch (Exception e) { throw new RuntimeException(e); } diff --git a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/CreateTableDataTypesTimeZoneIT.java b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/CreateTableDataTypesTimeZoneIT.java index 93bdc1dc6..aa7418596 100644 --- a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/CreateTableDataTypesTimeZoneIT.java +++ b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/CreateTableDataTypesTimeZoneIT.java @@ -69,8 +69,8 @@ public void testCreateTable() throws Exception { props.setProperty("database.include.list", "datatypes"); engine.set(new DebeziumChangeEventCapture()); - engine.get().setup(ITCommon.getDebeziumProperties(mySqlContainer, clickHouseContainer), new SourceRecordParserService(), - new MySQLDDLParserService(new ClickHouseSinkConnectorConfig(new HashMap<>()), "datatypes"), false); + engine.get().setup(ITCommon.getDebeziumProperties(mySqlContainer, clickHouseContainer), new SourceRecordParserService() + , false); } catch (Exception e) { throw new RuntimeException(e); } diff --git a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/DDLBaseIT.java b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/DDLBaseIT.java index 51409f8e0..412c94828 100644 --- a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/DDLBaseIT.java +++ b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/DDLBaseIT.java @@ -2,6 +2,7 @@ import com.altinity.clickhouse.debezium.embedded.ITCommon; +import com.altinity.clickhouse.debezium.embedded.config.SinkConnectorLightWeightConfig; import org.apache.log4j.BasicConfigurator; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; @@ -60,8 +61,11 @@ protected Connection connectToMySQL() { protected Properties getDebeziumProperties() throws Exception { - return ITCommon.getDebeziumProperties(mySqlContainer, clickHouseContainer); + Properties props = ITCommon.getDebeziumProperties(mySqlContainer, clickHouseContainer); + props.put(SinkConnectorLightWeightConfig.DDL_RETRY, "true"); + + return props; } } diff --git a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/DateTimeWithTimeZoneColumnSchemaOnlyIT.java b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/DateTimeWithTimeZoneColumnSchemaOnlyIT.java index 785145afa..a9f3ac7e7 100644 --- a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/DateTimeWithTimeZoneColumnSchemaOnlyIT.java +++ b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/DateTimeWithTimeZoneColumnSchemaOnlyIT.java @@ -109,8 +109,7 @@ public void testSchemaOnlyMode() throws Exception { try { engine.set(new DebeziumChangeEventCapture()); - engine.get().setup(getDebeziumProperties(), new SourceRecordParserService(), - new MySQLDDLParserService(new ClickHouseSinkConnectorConfig(new HashMap<>()), "employees"), false); + engine.get().setup(getDebeziumProperties(), new SourceRecordParserService(), false); } catch (Exception e) { throw new RuntimeException(e); } diff --git a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/DateTimeWithTimeZoneIT.java b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/DateTimeWithTimeZoneIT.java index 599f82372..5c6ec0249 100644 --- a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/DateTimeWithTimeZoneIT.java +++ b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/DateTimeWithTimeZoneIT.java @@ -70,8 +70,7 @@ public void testCreateTable() throws Exception { props.setProperty("database.include.list", "datatypes"); engine.set(new DebeziumChangeEventCapture()); - engine.get().setup(ITCommon.getDebeziumProperties(mySqlContainer, clickHouseContainer), new SourceRecordParserService(), - new MySQLDDLParserService(new ClickHouseSinkConnectorConfig(new HashMap<>()), "datatypes"), false); + engine.get().setup(ITCommon.getDebeziumProperties(mySqlContainer, clickHouseContainer), new SourceRecordParserService(), false); } catch (Exception e) { throw new RuntimeException(e); } diff --git a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/DateTimeWithTimeZoneSchemaOnlyIT.java b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/DateTimeWithTimeZoneSchemaOnlyIT.java index 6ade4e240..27230aa48 100644 --- a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/DateTimeWithTimeZoneSchemaOnlyIT.java +++ b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/DateTimeWithTimeZoneSchemaOnlyIT.java @@ -72,8 +72,7 @@ public void testCreateTable() throws Exception { props.setProperty("database.include.list", "datatypes"); engine.set(new DebeziumChangeEventCapture()); - engine.get().setup(getDebeziumProperties(), new SourceRecordParserService(), - new MySQLDDLParserService(new ClickHouseSinkConnectorConfig(new HashMap<>()), "datatypes"), false); + engine.get().setup(getDebeziumProperties(), new SourceRecordParserService(), false); } catch (Exception e) { throw new RuntimeException(e); } diff --git a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/DateTimeWithUserProvidedDifferentTimeZoneIT.java b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/DateTimeWithUserProvidedDifferentTimeZoneIT.java index 34985b83f..887b04d62 100644 --- a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/DateTimeWithUserProvidedDifferentTimeZoneIT.java +++ b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/DateTimeWithUserProvidedDifferentTimeZoneIT.java @@ -70,9 +70,7 @@ public void testCreateTable() throws Exception { try { engine.set(new DebeziumChangeEventCapture()); - engine.get().setup(getDebeziumProperties(), new SourceRecordParserService(), - new MySQLDDLParserService(new ClickHouseSinkConnectorConfig(new HashMap<>()), - "datatypes"), false); + engine.get().setup(getDebeziumProperties(), new SourceRecordParserService(), false); } catch (Exception e) { throw new RuntimeException(e); } diff --git a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/DateTimeWithUserProvidedTimeZoneSchemaOnlyIT.java b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/DateTimeWithUserProvidedTimeZoneSchemaOnlyIT.java index cb3bf551e..0920e706a 100644 --- a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/DateTimeWithUserProvidedTimeZoneSchemaOnlyIT.java +++ b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/DateTimeWithUserProvidedTimeZoneSchemaOnlyIT.java @@ -74,8 +74,7 @@ public void testCreateTable() throws Exception { props.setProperty("database.include.list", "datatypes"); engine.set(new DebeziumChangeEventCapture()); - engine.get().setup(getDebeziumProperties(), new SourceRecordParserService(), - new MySQLDDLParserService(new ClickHouseSinkConnectorConfig(new HashMap<>()), "datatypes"), false); + engine.get().setup(getDebeziumProperties(), new SourceRecordParserService(), false); } catch (Exception e) { throw new RuntimeException(e); } diff --git a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/EmployeesDBIT.java b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/EmployeesDBIT.java index 760e452d8..e5d8d654e 100644 --- a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/EmployeesDBIT.java +++ b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/EmployeesDBIT.java @@ -62,9 +62,7 @@ public void testEmployeesDB() throws Exception { executorService.execute(() -> { try { engine.set(new DebeziumChangeEventCapture()); - engine.get().setup(getDebeziumProperties(), new SourceRecordParserService(), - new MySQLDDLParserService(new ClickHouseSinkConnectorConfig(new HashMap<>()), - "employees"), false); + engine.get().setup(getDebeziumProperties(), new SourceRecordParserService(), false); } catch (Exception e) { throw new RuntimeException(e); } diff --git a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/IsDeletedColumnsIT.java b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/IsDeletedColumnsIT.java index 43fcd20ce..28382c53b 100644 --- a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/IsDeletedColumnsIT.java +++ b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/IsDeletedColumnsIT.java @@ -74,8 +74,8 @@ public void testIsDeleted(String clickHouseServerVersion) throws Exception { try { engine.set(new DebeziumChangeEventCapture()); - engine.get().setup(ITCommon.getDebeziumProperties(mySqlContainer, clickHouseContainer), new SourceRecordParserService(), - new MySQLDDLParserService(new ClickHouseSinkConnectorConfig(new HashMap<>()), "datatypes"),false); + engine.get().setup(ITCommon.getDebeziumProperties(mySqlContainer, clickHouseContainer), new SourceRecordParserService() + ,false); } catch (Exception e) { throw new RuntimeException(e); } diff --git a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/MultipleDatabaseIT.java b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/MultipleDatabaseIT.java index 0126b6293..07db78f4f 100644 --- a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/MultipleDatabaseIT.java +++ b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/MultipleDatabaseIT.java @@ -78,8 +78,7 @@ public void testMultipleDatabases() throws Exception { try { engine.set(new DebeziumChangeEventCapture()); - engine.get().setup(props, new SourceRecordParserService(), - new MySQLDDLParserService(new ClickHouseSinkConnectorConfig(new HashMap<>()), "test_db"),false); + engine.get().setup(props, new SourceRecordParserService(), false); } catch (Exception e) { throw new RuntimeException(e); } diff --git a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/MySqlDDLParserListenerImplTest.java b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/MySqlDDLParserListenerImplTest.java index eb688a0f1..442cfc3cf 100644 --- a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/MySqlDDLParserListenerImplTest.java +++ b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/MySqlDDLParserListenerImplTest.java @@ -14,6 +14,7 @@ import org.junit.jupiter.params.provider.CsvSource; import java.util.HashMap; +import java.util.Map; import java.util.concurrent.atomic.AtomicBoolean; @@ -49,6 +50,18 @@ public void testCreateTableWithEnum() { Assert.assertTrue(clickHouseQuery.toString().equalsIgnoreCase("CREATE TABLE employees.employees_predated(emp_no Int32 NOT NULL ,birth_date Date32 NOT NULL ,first_name String NOT NULL ,last_name String NOT NULL ,gender String NOT NULL ,hire_date Date32 NOT NULL ,`_version` UInt64,`is_deleted` UInt8) Engine=ReplacingMergeTree(_version,is_deleted) ORDER BY (emp_no)")); log.info("Create table " + clickHouseQuery); } + + @Test + public void testCreateTableWithSetDataType() { + + String createQuery = "CREATE TABLE example(options SET('a', 'b', 'c', 'd'))"; + StringBuffer clickHouseQuery = new StringBuffer(); + + mySQLDDLParserService.parseSql(createQuery, "test", clickHouseQuery); + Assert.assertTrue("CREATE TABLE employees.example(options Nullable(String),`_version` UInt64,`is_deleted` UInt8) Engine=ReplacingMergeTree(_version,is_deleted) ORDER BY tuple()".equalsIgnoreCase(clickHouseQuery.toString())); + ; + } + @Test public void testCreateTableWithRangeByColumnsPartition() { String createQuery = "CREATE TABLE rcx ( a INT, b INT, c CHAR(3), d INT) PARTITION BY RANGE COLUMNS(a,d,c) ( PARTITION p0 VALUES LESS THAN (5,10,'ggg'), PARTITION p1 VALUES LESS THAN (10,20,'mmm'), " + @@ -214,6 +227,25 @@ public void testAutoCreateTableWithCHTimezoneUpperCaseDateTime() { log.info("Create table " + clickHouseQuery); } + @Test + public void testCreateTableWithReplicatedReplacingMergeTree() { + + StringBuffer clickHouseQuery = new StringBuffer(); + String createDB = "CREATE TABLE IF NOT EXISTS mysql1.`table_7220f7bd_8c8c_11ef_94db_67ff65f7711d` (id INT NOT NULL,col1 varchar(255), col2 int, PRIMARY KEY (id)) ENGINE = InnoDB))"; + + // Set ClickHouse sink connector config to set replicated tables. + Map config = new HashMap<>(); + config.put(ClickHouseSinkConnectorConfigVariables.AUTO_CREATE_TABLES_REPLICATED.toString(), "true"); + config.put(ClickHouseSinkConnectorConfigVariables.CLICKHOUSE_DATABASE_OVERRIDE_MAP.toString(), "mysql1:ch1"); + + ClickHouseSinkConnectorConfig clickHouseSinkConnectorConfig = new ClickHouseSinkConnectorConfig(config); + MySQLDDLParserService mySQLDDLParserService = new MySQLDDLParserService(clickHouseSinkConnectorConfig, "ch1"); + mySQLDDLParserService.parseSql(createDB, "Persons", clickHouseQuery); + log.info("Create table " + clickHouseQuery); + + Assert.assertTrue(clickHouseQuery.toString().equalsIgnoreCase("CREATE TABLE if not exists ch1.`table_7220f7bd_8c8c_11ef_94db_67ff65f7711d` ON CLUSTER `{cluster}`(id Int32 NOT NULL ,col1 Nullable(String),col2 Nullable(Int32),`_version` UInt64,`is_deleted` UInt8)Engine=ReplicatedReplacingMergeTree(_version, is_deleted) ORDER BY (id)")); + + } @Test public void testCreateTableAutoIncrement() { StringBuffer clickHouseQuery = new StringBuffer(); @@ -241,6 +273,7 @@ public void testCreateTable() { log.info("Create table " + clickHouseQuery); } + @Test public void testCreateTableWithNulLFields() { StringBuffer clickHouseQuery = new StringBuffer(); @@ -601,6 +634,20 @@ public void renameTable() { Assert.assertTrue(clickHouseQuery.toString().equalsIgnoreCase("rename table employees.add_test to employees.add_test_old")); } + @Test + public void testRenameTableWithDatabaseOverride() { + StringBuffer clickHouseQuery = new StringBuffer(); + + HashMap props = new HashMap<>(); + props.put(ClickHouseSinkConnectorConfigVariables.CLICKHOUSE_DATABASE_OVERRIDE_MAP.toString(), "employees:employees2, products:productsnew"); + ClickHouseSinkConnectorConfig config = new ClickHouseSinkConnectorConfig(props); + MySQLDDLParserService mySQLDDLParserService = new MySQLDDLParserService(config, "employees2"); + + String sql = "rename table employees.add_test to employees.add_test_old"; + + mySQLDDLParserService.parseSql(sql, "table1", clickHouseQuery); + Assert.assertTrue(clickHouseQuery.toString().equalsIgnoreCase("rename table employees2.add_test to employees2.add_test_old")); + } @Test public void testAddIndex() { StringBuffer clickHouseQuery = new StringBuffer(); @@ -677,7 +724,7 @@ public void renameMultipleTables() { String sql = "rename /* gh-ost */ table `trade_prod`.`enriched_trade` to `trade_prod`.`_enriched_trade_del`, `trade_prod`.`_enriched_trade_gho` to `trade_prod`.`enriched_trade`\n"; mySQLDDLParserService.parseSql(sql, "", clickHouseQuery); - Assert.assertTrue(clickHouseQuery.toString().equalsIgnoreCase("RENAME TABLE `trade_prod`.`enriched_trade` to `trade_prod`.`_enriched_trade_del`,`trade_prod`.`_enriched_trade_gho` to `trade_prod`.`enriched_trade`")); + Assert.assertTrue(clickHouseQuery.toString().equalsIgnoreCase("RENAME TABLE employees.`enriched_trade` to employees.`_enriched_trade_del`,employees.`_enriched_trade_gho` to employees.`enriched_trade`")); } @Test public void alterTableRenameTable() { diff --git a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/TableOperationsIT.java b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/TableOperationsIT.java index b72d2f8dd..2678f81fe 100644 --- a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/TableOperationsIT.java +++ b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/TableOperationsIT.java @@ -2,6 +2,7 @@ import com.altinity.clickhouse.debezium.embedded.ITCommon; import com.altinity.clickhouse.debezium.embedded.cdc.DebeziumChangeEventCapture; +import com.altinity.clickhouse.debezium.embedded.common.PropertiesHelper; import com.altinity.clickhouse.debezium.embedded.parser.SourceRecordParserService; import com.altinity.clickhouse.sink.connector.ClickHouseSinkConnectorConfig; import com.altinity.clickhouse.sink.connector.db.BaseDbWriter; @@ -13,6 +14,7 @@ import org.junit.jupiter.api.DisplayName; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.CsvSource; +import org.junit.jupiter.params.provider.ValueSource; import org.testcontainers.clickhouse.ClickHouseContainer; import org.testcontainers.containers.MySQLContainer; import org.testcontainers.containers.wait.strategy.HttpWaitStrategy; @@ -22,6 +24,7 @@ import java.sql.Connection; import java.util.HashMap; import java.util.Map; +import java.util.Properties; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicReference; @@ -60,22 +63,27 @@ public void startContainers() throws InterruptedException { clickHouseContainer.start(); } @ParameterizedTest - @CsvSource({ - "clickhouse/clickhouse-server:latest", - "clickhouse/clickhouse-server:22.3" - }) + @ValueSource(booleans = { + false, + true} + ) @DisplayName("Test that validates DDL(Create, ALTER, RENAME)") - public void testTableOperations(String clickHouseServerVersion) throws Exception { + public void testTableOperations(boolean databaseOverride) throws Exception { AtomicReference engine = new AtomicReference<>(); + Properties props = ITCommon.getDebeziumProperties(mySqlContainer, clickHouseContainer); + if(databaseOverride) { + props.setProperty("clickhouse.database.override.map", "employees:ch_employees, datatypes:ch_datatypes, public:ch_public, project:ch_project"); + } + DebeziumChangeEventCapture debeziumChangeEventCapture = new DebeziumChangeEventCapture(); ExecutorService executorService = Executors.newFixedThreadPool(1); executorService.execute(() -> { try { - engine.set(new DebeziumChangeEventCapture()); - engine.get().setup(ITCommon.getDebeziumProperties(mySqlContainer, clickHouseContainer), new SourceRecordParserService(), - new MySQLDDLParserService(new ClickHouseSinkConnectorConfig(new HashMap<>()), "employees"),false); + engine.set(debeziumChangeEventCapture); + ClickHouseSinkConnectorConfig config = new ClickHouseSinkConnectorConfig(PropertiesHelper.toMap(props)); + engine.get().setup(props, new SourceRecordParserService(),false); } catch (Exception e) { throw new RuntimeException(e); } @@ -100,11 +108,15 @@ public void testTableOperations(String clickHouseServerVersion) throws Exception "PARTITIONS 6;").execute(); conn.prepareStatement("create table copied_table like new_table").execute(); conn.prepareStatement("CREATE TABLE rcx ( a INT not null, b INT, c CHAR(3) not null, d INT not null) PARTITION BY RANGE COLUMNS(a,d,c) ( PARTITION p0 VALUES LESS THAN (5,10,'ggg'));").execute(); + + // insert a new row to new_table + conn.prepareStatement("insert into new_table values('a', 1, 1)").execute(); + Thread.sleep(10000); conn.prepareStatement("\n" + - "CREATE TABLE contacts (id INT AUTO_INCREMENT PRIMARY KEY,\n" + + "CREATE TABLE contacts (id INT NOT NULL AUTO_INCREMENT PRIMARY KEY,\n" + "first_name VARCHAR(50) NOT NULL,\n" + "last_name VARCHAR(50) NOT NULL,\n" + "fullname varchar(101) GENERATED ALWAYS AS (CONCAT(first_name,' ',last_name)),\n" + @@ -131,24 +143,7 @@ public void testTableOperations(String clickHouseServerVersion) throws Exception // Validate table created with partitions. String membersResult = writer.executeQuery("show create table members"); - - if(clickHouseServerVersion.contains("latest")) { - Assert.assertTrue(membersResult.equalsIgnoreCase("CREATE TABLE employees.members\n" + - "(\n" + - " `firstname` String,\n" + - " `lastname` String,\n" + - " `username` String,\n" + - " `email` Nullable(String),\n" + - " `joined` Date32,\n" + - " `_version` UInt64,\n" + - " `is_deleted` UInt8\n" + - ")\n" + - "ENGINE = ReplacingMergeTree(_version, is_deleted)\n" + - "PARTITION BY joined\n" + - "ORDER BY tuple()\n" + - "SETTINGS index_granularity = 8192")); - } else { - Assert.assertTrue(membersResult.equalsIgnoreCase("CREATE TABLE employees.members\n" + + Assert.assertTrue(membersResult.equalsIgnoreCase("CREATE TABLE employees.members\n" + "(\n" + " `firstname` String,\n" + " `lastname` String,\n" + @@ -162,12 +157,10 @@ public void testTableOperations(String clickHouseServerVersion) throws Exception "PARTITION BY joined\n" + "ORDER BY tuple()\n" + "SETTINGS index_granularity = 8192")); - } String rcxResult = writer.executeQuery("show create table rcx"); - if(clickHouseServerVersion.contains("latest")) { - Assert.assertTrue(rcxResult.equalsIgnoreCase("CREATE TABLE employees.rcx\n" + + Assert.assertTrue(rcxResult.equalsIgnoreCase("CREATE TABLE employees.rcx\n" + "(\n" + " `a` Int32,\n" + " `b` Nullable(Int32),\n" + @@ -180,8 +173,10 @@ public void testTableOperations(String clickHouseServerVersion) throws Exception "PARTITION BY (a, d, c)\n" + "ORDER BY tuple()\n" + "SETTINGS index_granularity = 8192")); - } + Thread.sleep(10000); + // Delete offset table. + debeziumChangeEventCapture.deleteOffsets(props); if(engine.get() != null) { engine.get().stop(); diff --git a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/TruncateTableIT.java b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/TruncateTableIT.java index 626d68b2b..62518cabe 100644 --- a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/TruncateTableIT.java +++ b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/TruncateTableIT.java @@ -71,8 +71,7 @@ public void testIsDeleted() throws Exception { try { engine.set(new DebeziumChangeEventCapture()); - engine.get().setup(ITCommon.getDebeziumProperties(mySqlContainer, clickHouseContainer), new SourceRecordParserService(), - new MySQLDDLParserService(new ClickHouseSinkConnectorConfig(new HashMap<>()), "datatypes"),false); + engine.get().setup(ITCommon.getDebeziumProperties(mySqlContainer, clickHouseContainer), new SourceRecordParserService() ,false); } catch (Exception e) { throw new RuntimeException(e); }