diff --git a/pom.xml b/pom.xml
index b122cfd0e..9a78be19d 100644
--- a/pom.xml
+++ b/pom.xml
@@ -14,7 +14,7 @@
17
UTF-8
UTF-8
- 2.5.0.Beta1
+ 2.7.2.Final
5.9.1
3.1.1
UTF-8
diff --git a/sink-connector-lightweight/dependency-reduced-pom.xml b/sink-connector-lightweight/dependency-reduced-pom.xml
index a9ccbc9ed..37d175050 100644
--- a/sink-connector-lightweight/dependency-reduced-pom.xml
+++ b/sink-connector-lightweight/dependency-reduced-pom.xml
@@ -131,7 +131,7 @@
io.debezium
debezium-connector-mongodb
- 2.7.0.Beta2
+ 2.7.2.Final
test
@@ -326,7 +326,7 @@
UTF-8
3.1.1
17
- 2.7.0.Beta2
+ 2.7.2.Final
io.quarkus.platform
diff --git a/sink-connector-lightweight/src/main/java/com/altinity/clickhouse/debezium/embedded/ClickHouseDebeziumEmbeddedApplication.java b/sink-connector-lightweight/src/main/java/com/altinity/clickhouse/debezium/embedded/ClickHouseDebeziumEmbeddedApplication.java
index 3d356957d..90dc38a1f 100644
--- a/sink-connector-lightweight/src/main/java/com/altinity/clickhouse/debezium/embedded/ClickHouseDebeziumEmbeddedApplication.java
+++ b/sink-connector-lightweight/src/main/java/com/altinity/clickhouse/debezium/embedded/ClickHouseDebeziumEmbeddedApplication.java
@@ -86,8 +86,7 @@ public static void main(String[] args) throws Exception {
setupMonitoringThread(new ClickHouseSinkConnectorConfig(PropertiesHelper.toMap(props)), props);
- embeddedApplication.start(injector.getInstance(DebeziumRecordParserService.class),
- injector.getInstance(DDLParserService.class), props, false);
+ embeddedApplication.start(injector.getInstance(DebeziumRecordParserService.class), props, false);
try {
DebeziumEmbeddedRestApi.startRestApi(props, injector, debeziumChangeEventCapture, userProperties);
@@ -141,8 +140,7 @@ public static CompletableFuture startDebeziumEventLoop(Injector injector
Thread.sleep(500);
// embeddedApplication = new ClickHouseDebeziumEmbeddedApplication();
- embeddedApplication.start(injector.getInstance(DebeziumRecordParserService.class),
- injector.getInstance(DDLParserService.class), props, true);
+ embeddedApplication.start(injector.getInstance(DebeziumRecordParserService.class), props, true);
return null;
});
@@ -151,7 +149,7 @@ public static CompletableFuture startDebeziumEventLoop(Injector injector
public static void start(DebeziumRecordParserService recordParserService,
- DDLParserService ddlParserService, Properties props, boolean forceStart) throws Exception {
+ Properties props, boolean forceStart) throws Exception {
if(forceStart == true) {
// Reload the configuration file.
@@ -159,7 +157,7 @@ public static void start(DebeziumRecordParserService recordParserService,
loadPropertiesFile(configurationFile);
}
debeziumChangeEventCapture = new DebeziumChangeEventCapture();
- debeziumChangeEventCapture.setup(props, recordParserService, ddlParserService, forceStart);
+ debeziumChangeEventCapture.setup(props, recordParserService, forceStart);
}
public static void stop() throws IOException {
@@ -210,8 +208,7 @@ public void run() {
log.info("******* Restarting Event Loop ********");
debeziumChangeEventCapture.stop();
Thread.sleep(3000);
- start(injector.getInstance(DebeziumRecordParserService.class),
- injector.getInstance(DDLParserService.class), props, true);
+ start(injector.getInstance(DebeziumRecordParserService.class), props, true);
} catch (IOException e) {
log.error("**** ERROR: Restarting Event Loop ****", e);
throw new RuntimeException(e);
diff --git a/sink-connector-lightweight/src/main/java/com/altinity/clickhouse/debezium/embedded/api/DebeziumEmbeddedRestApi.java b/sink-connector-lightweight/src/main/java/com/altinity/clickhouse/debezium/embedded/api/DebeziumEmbeddedRestApi.java
index d8c0ce5af..b2772f63b 100644
--- a/sink-connector-lightweight/src/main/java/com/altinity/clickhouse/debezium/embedded/api/DebeziumEmbeddedRestApi.java
+++ b/sink-connector-lightweight/src/main/java/com/altinity/clickhouse/debezium/embedded/api/DebeziumEmbeddedRestApi.java
@@ -127,7 +127,7 @@ public static void startRestApi(Properties props, Injector injector,
try {
debeziumChangeEventCapture.deleteSchemaHistory(config, finalProps1);
} catch (Exception e) {
- log.error("Client - Error deleting offsets", e);
+ log.error("Client - Error deleting schema history", e);
ctx.result(e.toString());
ctx.status(HttpStatus.INTERNAL_SERVER_ERROR);
return;
diff --git a/sink-connector-lightweight/src/main/java/com/altinity/clickhouse/debezium/embedded/cdc/DebeziumChangeEventCapture.java b/sink-connector-lightweight/src/main/java/com/altinity/clickhouse/debezium/embedded/cdc/DebeziumChangeEventCapture.java
index d5bdb6caf..eeab41d69 100644
--- a/sink-connector-lightweight/src/main/java/com/altinity/clickhouse/debezium/embedded/cdc/DebeziumChangeEventCapture.java
+++ b/sink-connector-lightweight/src/main/java/com/altinity/clickhouse/debezium/embedded/cdc/DebeziumChangeEventCapture.java
@@ -25,6 +25,8 @@
import io.debezium.engine.ChangeEvent;
import io.debezium.engine.DebeziumEngine;
import io.debezium.engine.spi.OffsetCommitPolicy;
+import io.debezium.relational.history.SchemaHistory;
+import io.debezium.storage.jdbc.history.JdbcSchemaHistoryConfig;
import io.debezium.storage.jdbc.offset.JdbcOffsetBackingStoreConfig;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.kafka.connect.data.Field;
@@ -164,10 +166,20 @@ private void performDDLOperation(String DDL, Properties props, SourceRecord sr,
updateMetrics(DDL, writer);
}
+ /**
+ * Function to get the database name from the SourceRecord.
+ * If the database name is not present in the SourceRecord, then
+ * the database name is set to "system".
+ * Also if a database is overridden in the configuration, then
+ * the database name is set to the overridden database name.
+ * @param sr
+ * @return
+ */
private String getDatabaseName(SourceRecord sr) {
if (sr != null && sr.key() instanceof Struct) {
String recordDbName = (String) ((Struct) sr.key()).get("databaseName");
if (recordDbName != null && !recordDbName.isEmpty()) {
+
return recordDbName;
}
}
@@ -411,8 +423,8 @@ private Pair getDebeziumOffsetStorageDatabaseName(Properties pro
* @return
*/
private Pair getDebeziumSchemaHistoryDatabaseName(Properties props) {
- String tableName = props.getProperty(JdbcOffsetBackingStoreConfig.OFFSET_STORAGE_PREFIX +
- JdbcOffsetBackingStoreConfig.PROP_TABLE_NAME.name());
+ String tableName = props.getProperty(SchemaHistory.CONFIGURATION_FIELD_PREFIX_STRING +
+ JdbcSchemaHistoryConfig.PROP_TABLE_NAME.name());
return splitTableName(tableName);
}
@@ -605,10 +617,11 @@ public void deleteSchemaHistory(ClickHouseSinkConnectorConfig config, Properties
databaseName, dbCredentials.getUserName(),
dbCredentials.getPassword(), config, this.conn);
- // Get topic.prefix from config
- String topicPrefix = config.getString(CommonConnectorConfig.TOPIC_PREFIX.name());
- new DebeziumOffsetStorage().deleteSchemaHistoryTable(topicPrefix, tableNameDatabaseName.getRight() + "."
- + tableNameDatabaseName.getLeft(),writer);
+ // Get topic.prefix from properies
+ String topicPrefix = props.getProperty(CommonConnectorConfig.TOPIC_PREFIX.name());
+ // String topicPrefix = config.getString(CommonConnectorConfig.TOPIC_PREFIX.name());
+ // Jdbc adds the database name to the table name, so we need to remove it
+ new DebeziumOffsetStorage().deleteSchemaHistoryTable(topicPrefix, tableName, writer);
}
/**
@@ -770,7 +783,7 @@ public void connectorStopped() {
* @param debeziumRecordParserService
*/
public void setup(Properties props, DebeziumRecordParserService debeziumRecordParserService,
- DDLParserService ddlParserService, boolean forceStart) throws IOException, ClassNotFoundException {
+ boolean forceStart) throws IOException, ClassNotFoundException {
// Check if max queue size was defined by the user.
if(props.getProperty(ClickHouseSinkConnectorConfigVariables.MAX_QUEUE_SIZE.toString()) != null) {
diff --git a/sink-connector-lightweight/src/main/java/com/altinity/clickhouse/debezium/embedded/cdc/DebeziumOffsetStorage.java b/sink-connector-lightweight/src/main/java/com/altinity/clickhouse/debezium/embedded/cdc/DebeziumOffsetStorage.java
index 61d0ae702..903f92878 100644
--- a/sink-connector-lightweight/src/main/java/com/altinity/clickhouse/debezium/embedded/cdc/DebeziumOffsetStorage.java
+++ b/sink-connector-lightweight/src/main/java/com/altinity/clickhouse/debezium/embedded/cdc/DebeziumOffsetStorage.java
@@ -2,6 +2,8 @@
import com.altinity.clickhouse.sink.connector.db.BaseDbWriter;
+import com.clickhouse.logging.Logger;
+import com.clickhouse.logging.LoggerFactory;
import io.debezium.storage.jdbc.offset.JdbcOffsetBackingStoreConfig;
import org.json.simple.JSONObject;
import org.json.simple.parser.JSONParser;
@@ -32,6 +34,7 @@ public class DebeziumOffsetStorage {
public static final String SOURCE_PASSWORD = "source_password";
+ private static final Logger log = LoggerFactory.getLogger(DebeziumOffsetStorage.class);
public String getOffsetKey(Properties props) {
String connectorName = props.getProperty("name");
@@ -62,7 +65,8 @@ public void deleteSchemaHistoryTable(String offsetKey,
BaseDbWriter writer) throws SQLException {
- String debeziumStorageStatusQuery = String.format("delete from %s where JSONExtractRaw(JSONExtractRaw(history_data,'source'), 'server')='\"%s\"" , tableName, offsetKey);
+ String debeziumStorageStatusQuery = String.format("delete from `%s` where JSONExtractRaw(JSONExtractRaw(history_data,'source'), 'server')='%s'" , tableName, offsetKey);
+ log.info("Deleting schema history table query: " + debeziumStorageStatusQuery);
writer.executeQuery(debeziumStorageStatusQuery);
}
/**
diff --git a/sink-connector-lightweight/src/main/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/MySqlDDLParserListenerImpl.java b/sink-connector-lightweight/src/main/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/MySqlDDLParserListenerImpl.java
index b930f4a0c..98e5b2399 100644
--- a/sink-connector-lightweight/src/main/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/MySqlDDLParserListenerImpl.java
+++ b/sink-connector-lightweight/src/main/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/MySqlDDLParserListenerImpl.java
@@ -54,16 +54,8 @@ public MySqlDDLParserListenerImpl(BaseDbWriter writer, StringBuffer transformedQ
} catch(Exception e) {
log.error("enterCreateDatabase: Error parsing source to destination database map:" + e.toString());
}
- // databaseName might contain backticks. Remove them.
- if(databaseName.contains("`")) {
- databaseName = databaseName.replace("`", "");
- }
- if(sourceToDestinationMap.containsKey(databaseName)) {
- this.databaseName = sourceToDestinationMap.get(databaseName);
- } else {
- this.databaseName = databaseName;
- }
+ this.databaseName = overrideDatabaseName(databaseName);
this.query = transformedQuery;
this.tableName = tableName;
@@ -74,6 +66,23 @@ public MySqlDDLParserListenerImpl(BaseDbWriter writer, StringBuffer transformedQ
this.userProvidedTimeZone = parseTimeZone();
}
+ /**
+ * Function to override the database name.
+ * @param databaseName
+ * @return
+ */
+ private String overrideDatabaseName(String databaseName) {
+
+ // databaseName might contain backticks. Remove them.
+ if(databaseName.contains("`")) {
+ databaseName = databaseName.replace("`", "");
+ }
+
+ if(sourceToDestinationMap.containsKey(databaseName)) {
+ return sourceToDestinationMap.get(databaseName);
+ }
+ return databaseName;
+ }
public ZoneId parseTimeZone() {
String userProvidedTimeZone = config.getString(ClickHouseSinkConnectorConfigVariables
@@ -102,25 +111,9 @@ public void enterCreateDatabase(MySqlParser.CreateDatabaseContext createDatabase
String databaseName = tree.getText();
if(!databaseName.isEmpty()) {
- // Check if the database is overridden
- Map sourceToDestinationMap = new HashMap<>();
-
- try {
- if (this.config.getString(ClickHouseSinkConnectorConfigVariables.CLICKHOUSE_DATABASE_OVERRIDE_MAP.toString()) != null)
- sourceToDestinationMap = Utils.parseSourceToDestinationDatabaseMap(this.config.
- getString(ClickHouseSinkConnectorConfigVariables.CLICKHOUSE_DATABASE_OVERRIDE_MAP.toString()));
- } catch(Exception e) {
- log.error("enterCreateDatabase: Error parsing source to destination database map:" + e.toString());
- }
- // databaseName might contain backticks. Remove them.
- if(databaseName.contains("`")) {
- databaseName = databaseName.replace("`", "");
- }
- if(sourceToDestinationMap.containsKey(databaseName)) {
- this.query.append(String.format(Constants.CREATE_DATABASE, sourceToDestinationMap.get(databaseName)));
- } else {
- this.query.append(String.format(Constants.CREATE_DATABASE, databaseName));
- }
+
+ String overrideDatabaseName = overrideDatabaseName(tree.getText());
+ this.query.append(String.format(Constants.CREATE_DATABASE, overrideDatabaseName));
}
}
}
@@ -219,7 +212,10 @@ private Set parseCreateTable(MySqlParser.CreateTableContext ctx, StringB
this.tableName = tree.getText();
// If tableName already includes the database name don't include database name in this.query
if(tableName.contains(".")) {
- this.query.append(tableName);
+ // split tableName into databaseName and tableName
+ String[] tableNameSplit = tableName.split("\\.");
+ this.query.append(this.databaseName).append(".").append(tableNameSplit[1]);
+ //this.query.append(tableName);
} else
this.query.append(databaseName).append(".").append(tree.getText());
@@ -366,8 +362,7 @@ private String getClickHouseDataType(String parsedDataType, ParseTree colDefTree
MySqlParser.DataTypeContext dtc = ((MySqlParser.ColumnDefinitionContext) colDefTree).dataType();
DataType dt = DataTypeConverter.getDataType(dtc);
- if(dt.name().equalsIgnoreCase("ENUM"))
- {
+ if(dt.name().equalsIgnoreCase("ENUM") || dt.name().equalsIgnoreCase("SET")) {
// Dont try to get precision/scale for enums
}
else if(parsedDataType.contains("(") && parsedDataType.contains(")") && parsedDataType.contains(",") ) {
@@ -736,8 +731,12 @@ public void enterRenameTable(MySqlParser.RenameTableContext renameTableContext)
originalTableName = renameTableContextChildren.get(0).getText();
newTableName = renameTableContextChildren.get(2).getText();
// If the table name already includes the database name dont include it in the query.
- if(originalTableName.contains(".")) {
- this.query.append(originalTableName).append(" to ").append(newTableName);
+ if(originalTableName.contains(".") && newTableName.contains(".")) {
+ // Split database and table name.
+ String[] databaseAndTableNameArray = originalTableName.split("\\.");
+ String[] newDatabaseAndTableNameArray = newTableName.split("\\.");
+ this.query.append(this.databaseName).append(".").append(databaseAndTableNameArray[1]).append(" to ").
+ append(this.databaseName).append(".").append(newDatabaseAndTableNameArray[1]);
} else
this.query.append(databaseName).append(".").append(originalTableName).append(" to ").
append(databaseName).append(".").append(newTableName);
diff --git a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ClickHouseDebeziumEmbeddedPostgresDecoderBufsDockerIT.java b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ClickHouseDebeziumEmbeddedPostgresDecoderBufsDockerIT.java
index b6a694a7f..a9f044582 100644
--- a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ClickHouseDebeziumEmbeddedPostgresDecoderBufsDockerIT.java
+++ b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ClickHouseDebeziumEmbeddedPostgresDecoderBufsDockerIT.java
@@ -81,9 +81,7 @@ public void testDecoderBufsPlugin() throws Exception {
try {
engine.set(new DebeziumChangeEventCapture());
- engine.get().setup(getProperties(), new SourceRecordParserService(),
- new MySQLDDLParserService(new ClickHouseSinkConnectorConfig(new HashMap<>()),
- "employees"), false);
+ engine.get().setup(getProperties(), new SourceRecordParserService(), false);
} catch (Exception e) {
throw new RuntimeException(e);
}
diff --git a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ClickHouseDebeziumEmbeddedPostgresPgoutputDockerIT.java b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ClickHouseDebeziumEmbeddedPostgresPgoutputDockerIT.java
index d09e6da0c..5df4eebec 100644
--- a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ClickHouseDebeziumEmbeddedPostgresPgoutputDockerIT.java
+++ b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ClickHouseDebeziumEmbeddedPostgresPgoutputDockerIT.java
@@ -84,8 +84,7 @@ public void testPgOutputPlugin() throws Exception {
try {
engine.set(new DebeziumChangeEventCapture());
- engine.get().setup(getProperties(), new SourceRecordParserService(),
- new MySQLDDLParserService(new ClickHouseSinkConnectorConfig(new HashMap<>()), "system"), false);
+ engine.get().setup(getProperties(), new SourceRecordParserService(), false);
} catch (Exception e) {
throw new RuntimeException(e);
}
diff --git a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ITCommon.java b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ITCommon.java
index c24ac4241..dcc5b5063 100644
--- a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ITCommon.java
+++ b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ITCommon.java
@@ -116,6 +116,7 @@ static public Properties getDebeziumProperties(MySQLContainer mySqlContainer, Cl
defaultProps.setProperty("clickhouse.server.port", String.valueOf(clickHouseContainer.getFirstMappedPort()));
defaultProps.setProperty("clickhouse.server.user", clickHouseContainer.getUsername());
defaultProps.setProperty("clickhouse.server.password", clickHouseContainer.getPassword());
+ //defaultProps.setProperty("ddl.retry", "true");
defaultProps.setProperty("offset.storage.jdbc.url", String.format("jdbc:clickhouse://%s:%s",
clickHouseContainer.getHost(), clickHouseContainer.getFirstMappedPort()));
diff --git a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/MariaDBIT.java b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/MariaDBIT.java
index 6a575ab3f..97f1b9148 100644
--- a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/MariaDBIT.java
+++ b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/MariaDBIT.java
@@ -84,8 +84,7 @@ public void testMultipleDatabases() throws Exception {
try {
engine.set(new DebeziumChangeEventCapture());
- engine.get().setup(props, new SourceRecordParserService(),
- new MySQLDDLParserService(new ClickHouseSinkConnectorConfig(new HashMap<>()), "test_db"),false);
+ engine.get().setup(props, new SourceRecordParserService(),false);
} catch (Exception e) {
throw new RuntimeException(e);
}
diff --git a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/MySQLGenerateColumnsTest.java b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/MySQLGenerateColumnsTest.java
index cb7348279..6444b11a9 100644
--- a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/MySQLGenerateColumnsTest.java
+++ b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/MySQLGenerateColumnsTest.java
@@ -75,9 +75,7 @@ public void testMySQLGeneratedColumns() throws Exception {
Properties props = getDebeziumProperties(mySqlContainer, clickHouseContainer);
engine.set(new DebeziumChangeEventCapture());
- engine.get().setup(props, new SourceRecordParserService(),
- new MySQLDDLParserService(new ClickHouseSinkConnectorConfig(new HashMap<>()),
- "employees"), false);
+ engine.get().setup(props, new SourceRecordParserService(), false);
} catch (Exception e) {
throw new RuntimeException(e);
}
diff --git a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/MySQLJsonIT.java b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/MySQLJsonIT.java
index ccf9409ea..6712ae23f 100644
--- a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/MySQLJsonIT.java
+++ b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/MySQLJsonIT.java
@@ -78,8 +78,7 @@ public void testMultipleDatabases() throws Exception {
try {
engine.set(new DebeziumChangeEventCapture());
- engine.get().setup(props, new SourceRecordParserService(),
- new MySQLDDLParserService(new ClickHouseSinkConnectorConfig(new HashMap<>()), "test_db"),false);
+ engine.get().setup(props, new SourceRecordParserService(), false);
} catch (Exception e) {
throw new RuntimeException(e);
}
diff --git a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/OffsetManagementIT.java b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/OffsetManagementIT.java
index 0c89e3020..c8419da2d 100644
--- a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/OffsetManagementIT.java
+++ b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/OffsetManagementIT.java
@@ -93,8 +93,7 @@ public void testAutoCreateTable(String clickHouseServerVersion) throws Exception
try {
engine.set(new DebeziumChangeEventCapture());
- engine.get().setup(ITCommon.getDebeziumPropertiesForSchemaOnly(mySqlContainer, clickHouseContainer), new SourceRecordParserService(),
- new MySQLDDLParserService(new ClickHouseSinkConnectorConfig(new HashMap<>()), "datatypes"),false);
+ engine.get().setup(ITCommon.getDebeziumPropertiesForSchemaOnly(mySqlContainer, clickHouseContainer), new SourceRecordParserService(), false);
} catch (Exception e) {
throw new RuntimeException(e);
}
diff --git a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/PostgresInitialDockerIT.java b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/PostgresInitialDockerIT.java
index 4189bb5a5..96ac528fc 100644
--- a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/PostgresInitialDockerIT.java
+++ b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/PostgresInitialDockerIT.java
@@ -82,8 +82,7 @@ public void testDecoderBufsPlugin() throws Exception {
try {
engine.set(new DebeziumChangeEventCapture());
- engine.get().setup(getProperties(), new SourceRecordParserService(),
- new MySQLDDLParserService(new ClickHouseSinkConnectorConfig(new HashMap<>()), "employees"), false);
+ engine.get().setup(getProperties(), new SourceRecordParserService(), false);
} catch (Exception e) {
throw new RuntimeException(e);
}
diff --git a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/PostgresInitialDockerWKeeperMapStorageIT.java b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/PostgresInitialDockerWKeeperMapStorageIT.java
index 5a419e681..45ce26625 100644
--- a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/PostgresInitialDockerWKeeperMapStorageIT.java
+++ b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/PostgresInitialDockerWKeeperMapStorageIT.java
@@ -105,8 +105,7 @@ public void testDecoderBufsPlugin() throws Exception {
try {
engine.set(new DebeziumChangeEventCapture());
- engine.get().setup(getProperties(), new SourceRecordParserService(),
- new MySQLDDLParserService(new ClickHouseSinkConnectorConfig(new HashMap<>()), "employees"), false);
+ engine.get().setup(getProperties(), new SourceRecordParserService(), false);
} catch (Exception e) {
throw new RuntimeException(e);
}
diff --git a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/PostgresPgoutputMultipleSchemaIT.java b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/PostgresPgoutputMultipleSchemaIT.java
index 3fda8fbb8..2ea19ea19 100644
--- a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/PostgresPgoutputMultipleSchemaIT.java
+++ b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/PostgresPgoutputMultipleSchemaIT.java
@@ -87,8 +87,7 @@ public void testMultipleSchemaReplication() throws Exception {
try {
engine.set(new DebeziumChangeEventCapture());
- engine.get().setup(getProperties(), new SourceRecordParserService(),
- new MySQLDDLParserService(new ClickHouseSinkConnectorConfig(new HashMap<>()), "system"), false);
+ engine.get().setup(getProperties(), new SourceRecordParserService(), false);
} catch (Exception e) {
throw new RuntimeException(e);
}
diff --git a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ReplicatedRMTClickHouse22TIT.java b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ReplicatedRMTClickHouse22TIT.java
index 08ea89ba8..70fae4413 100644
--- a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ReplicatedRMTClickHouse22TIT.java
+++ b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ReplicatedRMTClickHouse22TIT.java
@@ -91,8 +91,7 @@ public void testReplicatedRMTAutoCreate(String clickHouseServerVersion) throws E
try {
engine.set(new DebeziumChangeEventCapture());
- engine.get().setup(props, new SourceRecordParserService(),
- new MySQLDDLParserService(new ClickHouseSinkConnectorConfig(new HashMap<>()), "employees"), false);
+ engine.get().setup(props, new SourceRecordParserService(), false);
} catch (Exception e) {
throw new RuntimeException(e);
}
diff --git a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ReplicatedRMTDDLClickHouse22TIT.java b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ReplicatedRMTDDLClickHouse22TIT.java
index 21e839d84..2ec6a53b1 100644
--- a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ReplicatedRMTDDLClickHouse22TIT.java
+++ b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ReplicatedRMTDDLClickHouse22TIT.java
@@ -93,8 +93,7 @@ public void testReplicatedRMTAutoCreate(String clickHouseServerVersion) throws E
try {
engine.set(new DebeziumChangeEventCapture());
- engine.get().setup(props, new SourceRecordParserService(),
- new MySQLDDLParserService(new ClickHouseSinkConnectorConfig(new HashMap<>()), "employees"), false);
+ engine.get().setup(props, new SourceRecordParserService(), false);
} catch (Exception e) {
throw new RuntimeException(e);
}
diff --git a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ReplicatedRMTDDLIT.java b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ReplicatedRMTDDLIT.java
index fc5278dd8..f0fe12e1b 100644
--- a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ReplicatedRMTDDLIT.java
+++ b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ReplicatedRMTDDLIT.java
@@ -96,8 +96,7 @@ public void testReplicatedRMTAutoCreate(String clickHouseServerVersion) throws E
try {
engine.set(new DebeziumChangeEventCapture());
- engine.get().setup(props, new SourceRecordParserService(),
- new MySQLDDLParserService(new ClickHouseSinkConnectorConfig(new HashMap<>()), "employees"), false);
+ engine.get().setup(props, new SourceRecordParserService(), false);
} catch (Exception e) {
throw new RuntimeException(e);
}
diff --git a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ReplicatedRMTIT.java b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ReplicatedRMTIT.java
index 65a385561..6cb2c1e9c 100644
--- a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ReplicatedRMTIT.java
+++ b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ReplicatedRMTIT.java
@@ -95,9 +95,7 @@ public void testReplicatedRMTAutoCreate(String clickHouseServerVersion) throws E
try {
engine.set(new DebeziumChangeEventCapture());
- engine.get().setup(props, new SourceRecordParserService(),
- new MySQLDDLParserService(new ClickHouseSinkConnectorConfig(new HashMap<>()),
- "employees"), false);
+ engine.get().setup(props, new SourceRecordParserService(), false);
} catch (Exception e) {
throw new RuntimeException(e);
}
diff --git a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/api/DebeziumEmbeddedRestApiIT.java b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/api/DebeziumEmbeddedRestApiIT.java
index b829176b6..7a1e76ab1 100644
--- a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/api/DebeziumEmbeddedRestApiIT.java
+++ b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/api/DebeziumEmbeddedRestApiIT.java
@@ -100,8 +100,8 @@ public void testRESTAPI(String clickHouseServerVersion) throws Exception {
try {
engine.set(new DebeziumChangeEventCapture());
- engine.get().setup(ITCommon.getDebeziumPropertiesForSchemaOnly(mySqlContainer, clickHouseContainer), new SourceRecordParserService(),
- new MySQLDDLParserService(new ClickHouseSinkConnectorConfig(new HashMap<>()), "datatypes"),false);
+ engine.get().setup(ITCommon.getDebeziumPropertiesForSchemaOnly(mySqlContainer, clickHouseContainer), new SourceRecordParserService()
+ ,false);
} catch (Exception e) {
throw new RuntimeException(e);
}
diff --git a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/cdc/BatchRetryOnFailureIT.java b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/cdc/BatchRetryOnFailureIT.java
index 0635be944..9e729d3ce 100644
--- a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/cdc/BatchRetryOnFailureIT.java
+++ b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/cdc/BatchRetryOnFailureIT.java
@@ -75,8 +75,7 @@ public void testBatchRetryOnCHFailure() throws Exception {
ExecutorService executorService = Executors.newFixedThreadPool(1);
executorService.execute(() -> {
try {
- clickHouseDebeziumEmbeddedApplication.start(injector.getInstance(DebeziumRecordParserService.class),
- injector.getInstance(DDLParserService.class), props, false);
+ clickHouseDebeziumEmbeddedApplication.start(injector.getInstance(DebeziumRecordParserService.class) , props, false);
DebeziumEmbeddedRestApi.startRestApi(props, injector, clickHouseDebeziumEmbeddedApplication.getDebeziumEventCapture()
, new Properties());
} catch (Exception e) {
diff --git a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/cdc/ClickHouseDelayedStartIT.java b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/cdc/ClickHouseDelayedStartIT.java
index bdc18d15d..d0d1e7105 100644
--- a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/cdc/ClickHouseDelayedStartIT.java
+++ b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/cdc/ClickHouseDelayedStartIT.java
@@ -76,8 +76,7 @@ public void testClickHouseDelayedStart() throws Exception {
ExecutorService executorService = Executors.newFixedThreadPool(1);
executorService.execute(() -> {
try {
- clickHouseDebeziumEmbeddedApplication.start(injector.getInstance(DebeziumRecordParserService.class),
- injector.getInstance(DDLParserService.class), props, false);
+ clickHouseDebeziumEmbeddedApplication.start(injector.getInstance(DebeziumRecordParserService.class), props, false);
DebeziumEmbeddedRestApi.startRestApi(props, injector, clickHouseDebeziumEmbeddedApplication.getDebeziumEventCapture()
, new Properties());
} catch (Exception e) {
@@ -136,8 +135,7 @@ public void debeziumStorageView() throws Exception {
ExecutorService executorService = Executors.newFixedThreadPool(1);
executorService.execute(() -> {
try {
- clickHouseDebeziumEmbeddedApplication.start(injector.getInstance(DebeziumRecordParserService.class),
- injector.getInstance(DDLParserService.class), props, false);
+ clickHouseDebeziumEmbeddedApplication.start(injector.getInstance(DebeziumRecordParserService.class), props, false);
DebeziumEmbeddedRestApi.startRestApi(props, injector, clickHouseDebeziumEmbeddedApplication.getDebeziumEventCapture()
, new Properties());
} catch (Exception e) {
diff --git a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/cdc/DatabaseOverrideIT.java b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/cdc/DatabaseOverrideIT.java
index c711c9ff0..5ab95eb1b 100644
--- a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/cdc/DatabaseOverrideIT.java
+++ b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/cdc/DatabaseOverrideIT.java
@@ -86,8 +86,7 @@ public void testDatabaseOverride() throws Exception {
ExecutorService executorService = Executors.newFixedThreadPool(1);
executorService.execute(() -> {
try {
- clickHouseDebeziumEmbeddedApplication.start(injector.getInstance(DebeziumRecordParserService.class),
- injector.getInstance(DDLParserService.class), props, false);
+ clickHouseDebeziumEmbeddedApplication.start(injector.getInstance(DebeziumRecordParserService.class), props, false);
DebeziumEmbeddedRestApi.startRestApi(props, injector, clickHouseDebeziumEmbeddedApplication.getDebeziumEventCapture()
, new Properties());
} catch (Exception e) {
@@ -114,6 +113,7 @@ public void testDatabaseOverride() throws Exception {
conn.prepareStatement("create table customers.custtable(col1 varchar(255) not null, col2 int, col3 int, primary key(col1))").execute();
conn.prepareStatement("insert into customers.custtable values('a', 1, 1)").execute();
+
Thread.sleep(10000);
// Validate in Clickhouse the last record written is 29999
@@ -148,6 +148,9 @@ public void testDatabaseOverride() throws Exception {
assertTrue(customersCol2 == 1);
+ Thread.sleep(10000);
+ // Execute the query in MySQL to rename table.
+
clickHouseDebeziumEmbeddedApplication.getDebeziumEventCapture().engine.close();
diff --git a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/cdc/DatabaseOverrideInitialIT.java b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/cdc/DatabaseOverrideInitialIT.java
index 5a615e623..465268279 100644
--- a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/cdc/DatabaseOverrideInitialIT.java
+++ b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/cdc/DatabaseOverrideInitialIT.java
@@ -86,8 +86,7 @@ public void testDatabaseOverride() throws Exception {
ExecutorService executorService = Executors.newFixedThreadPool(1);
executorService.execute(() -> {
try {
- clickHouseDebeziumEmbeddedApplication.start(injector.getInstance(DebeziumRecordParserService.class),
- injector.getInstance(DDLParserService.class), props, false);
+ clickHouseDebeziumEmbeddedApplication.start(injector.getInstance(DebeziumRecordParserService.class), props, false);
DebeziumEmbeddedRestApi.startRestApi(props, injector, clickHouseDebeziumEmbeddedApplication.getDebeziumEventCapture()
, new Properties());
} catch (Exception e) {
diff --git a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/cdc/DatabaseOverrideRRMTIT.java b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/cdc/DatabaseOverrideRRMTIT.java
new file mode 100644
index 000000000..182429f34
--- /dev/null
+++ b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/cdc/DatabaseOverrideRRMTIT.java
@@ -0,0 +1,215 @@
+package com.altinity.clickhouse.debezium.embedded.cdc;
+
+import com.altinity.clickhouse.debezium.embedded.AppInjector;
+import com.altinity.clickhouse.debezium.embedded.ClickHouseDebeziumEmbeddedApplication;
+import com.altinity.clickhouse.debezium.embedded.ITCommon;
+import com.altinity.clickhouse.debezium.embedded.api.DebeziumEmbeddedRestApi;
+import com.altinity.clickhouse.debezium.embedded.parser.DebeziumRecordParserService;
+import com.altinity.clickhouse.sink.connector.ClickHouseSinkConnectorConfig;
+import com.altinity.clickhouse.sink.connector.ClickHouseSinkConnectorConfigVariables;
+import com.altinity.clickhouse.sink.connector.db.BaseDbWriter;
+import com.clickhouse.jdbc.ClickHouseConnection;
+import com.google.inject.Guice;
+import com.google.inject.Injector;
+import org.apache.log4j.BasicConfigurator;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.DisplayName;
+import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.clickhouse.ClickHouseContainer;
+import org.testcontainers.containers.BindMode;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.MySQLContainer;
+import org.testcontainers.containers.Network;
+import org.testcontainers.containers.wait.strategy.HttpWaitStrategy;
+import org.testcontainers.junit.jupiter.Container;
+import org.testcontainers.utility.DockerImageName;
+
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.util.HashMap;
+import java.util.Properties;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import static com.altinity.clickhouse.debezium.embedded.ITCommon.getDebeziumProperties;
+import static org.junit.Assert.assertTrue;
+
+public class DatabaseOverrideRRMTIT {
+
+ private static final Logger log = LoggerFactory.getLogger(DatabaseOverrideRRMTIT.class);
+
+
+ protected MySQLContainer mySqlContainer;
+ static ClickHouseContainer clickHouseContainer;
+
+ static GenericContainer zookeeperContainer = new GenericContainer(DockerImageName.parse("zookeeper:3.6.2"))
+ .withExposedPorts(2181).withAccessToHost(true);
+
+ @BeforeEach
+ public void startContainers() throws InterruptedException {
+
+ Network network = Network.newNetwork();
+ zookeeperContainer.withNetwork(network).withNetworkAliases("zookeeper");
+ zookeeperContainer.start();
+ mySqlContainer = new MySQLContainer<>(DockerImageName.parse("docker.io/bitnami/mysql:8.0.36")
+ .asCompatibleSubstituteFor("mysql"))
+ .withDatabaseName("employees").withUsername("root").withPassword("adminpass")
+// .withInitScript("15k_tables_mysql.sql")
+ .withExtraHost("mysql-server", "0.0.0.0")
+ .waitingFor(new HttpWaitStrategy().forPort(3306));
+
+ clickHouseContainer = new ClickHouseContainer(DockerImageName.parse("clickhouse/clickhouse-server:latest")
+ .asCompatibleSubstituteFor("clickhouse"))
+ .withInitScript("init_clickhouse_schema_only_column_timezone.sql")
+ // .withCopyFileToContainer(MountableFile.forClasspathResource("config.xml"), "/etc/clickhouse-server/config.d/config.xml")
+ .withUsername("ch_user")
+ .withPassword("password")
+ .withClasspathResourceMapping("config_replicated.xml", "/etc/clickhouse-server/config.d/config.xml", BindMode.READ_ONLY)
+ .withClasspathResourceMapping("macros.xml", "/etc/clickhouse-server/config.d/macros.xml", BindMode.READ_ONLY)
+ .withExposedPorts(8123)
+ .waitingFor(new HttpWaitStrategy().forPort(zookeeperContainer.getFirstMappedPort()));
+ clickHouseContainer.withNetwork(network).withNetworkAliases("clickhouse");
+ clickHouseContainer.start();
+
+ BasicConfigurator.configure();
+ mySqlContainer.start();
+ clickHouseContainer.start();
+ Thread.sleep(35000);
+ }
+
+
+ @DisplayName("Test that validates overriding database name in ClickHouse for ReplicatedReplacingMergeTree(RRMT)")
+ @Test
+ public void testDatabaseOverride() throws Exception {
+
+ String jdbcUrl = BaseDbWriter.getConnectionString(clickHouseContainer.getHost(), clickHouseContainer.getFirstMappedPort(),
+ "system");
+ ClickHouseConnection chConn = BaseDbWriter.createConnection(jdbcUrl, "Client_1",
+ clickHouseContainer.getUsername(), clickHouseContainer.getPassword(), new ClickHouseSinkConnectorConfig(new HashMap<>()));
+ BaseDbWriter writer = new BaseDbWriter(clickHouseContainer.getHost(), clickHouseContainer.getFirstMappedPort(),
+ "employees", clickHouseContainer.getUsername(), clickHouseContainer.getPassword(), null, chConn);
+
+ writer.executeQuery("CREATE DATABASE employees2");
+ writer.executeQuery("CREATE DATABASE productsnew");
+
+ Thread.sleep(10000);
+ Injector injector = Guice.createInjector(new AppInjector());
+
+ Properties props = getDebeziumProperties(mySqlContainer, clickHouseContainer);
+ props.setProperty("snapshot.mode", "schema_only");
+ props.setProperty("schema.history.internal.store.only.captured.tables.ddl", "true");
+ props.setProperty("schema.history.internal.store.only.captured.databases.ddl", "true");
+ props.setProperty("clickhouse.database.override.map", "employees:employees2, products:productsnew");
+ props.setProperty("database.include.list", "employees, products, customers");
+ props.setProperty(ClickHouseSinkConnectorConfigVariables.AUTO_CREATE_TABLES_REPLICATED.toString(), "true");
+ props.setProperty(ClickHouseSinkConnectorConfigVariables.AUTO_CREATE_TABLES.toString(), "true");
+ props.setProperty("ddl.retry", "true");
+ // Override clickhouse server timezone.
+ ClickHouseDebeziumEmbeddedApplication clickHouseDebeziumEmbeddedApplication = new ClickHouseDebeziumEmbeddedApplication();
+
+
+ ExecutorService executorService = Executors.newFixedThreadPool(1);
+ executorService.execute(() -> {
+ try {
+ clickHouseDebeziumEmbeddedApplication.start(injector.getInstance(DebeziumRecordParserService.class), props, false);
+ DebeziumEmbeddedRestApi.startRestApi(props, injector, clickHouseDebeziumEmbeddedApplication.getDebeziumEventCapture()
+ , new Properties());
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+
+ });
+
+ Thread.sleep(25000);
+
+ // Employees table
+ Connection conn = ITCommon.connectToMySQL(mySqlContainer);
+ conn.prepareStatement("create table `newtable`(col1 varchar(255) not null, col2 int, col3 int, primary key(col1))").execute();
+
+ // Insert a new row in the table
+ conn.prepareStatement("insert into newtable values('a', 1, 1)").execute();
+
+
+ conn.prepareStatement("create database products").execute();
+ conn.prepareStatement("create table products.prodtable(col1 varchar(255) not null, col2 int, col3 int, primary key(col1))").execute();
+ conn.prepareStatement("insert into products.prodtable values('a', 1, 1)").execute();
+
+ conn.prepareStatement("create database customers").execute();
+ conn.prepareStatement("create table customers.custtable(col1 varchar(255) not null, col2 int, col3 int, primary key(col1))").execute();
+ conn.prepareStatement("insert into customers.custtable values('a', 1, 1)").execute();
+
+
+ Thread.sleep(10000);
+
+ // Validate in Clickhouse the last record written is 29999
+
+
+ long col2 = 0L;
+ ResultSet version1Result = writer.executeQueryWithResultSet("select col2 from employees2.newtable final where col1 = 'a'");
+ while(version1Result.next()) {
+ col2 = version1Result.getLong("col2");
+ }
+ Thread.sleep(10000);
+ assertTrue(col2 == 1);
+
+ long productsCol2 = 0L;
+ ResultSet productsVersionResult = writer.executeQueryWithResultSet("select col2 from productsnew.prodtable final where col1 = 'a'");
+ while(productsVersionResult.next()) {
+ productsCol2 = productsVersionResult.getLong("col2");
+ }
+ assertTrue(productsCol2 == 1);
+ Thread.sleep(10000);
+
+ long customersCol2 = 0L;
+ ResultSet customersVersionResult = writer.executeQueryWithResultSet("select col2 from customers.custtable final where col1 = 'a'");
+ while(customersVersionResult.next()) {
+ customersCol2 = customersVersionResult.getLong("col2");
+ }
+ assertTrue(customersCol2 == 1);
+
+
+ Thread.sleep(10000);
+ // Execute the query in MySQL to rename table.
+ conn.prepareStatement("use products").execute();
+ conn.prepareStatement("rename table prodtable to prodtable2").execute();
+ Thread.sleep(10000);
+// ResultSet customersVersionResult2 = writer.executeQueryWithResultSet("select col2 from customers.custtable2 final where col1 = 'a'");
+// while(customersVersionResult2.next()) {
+// customersCol2 = customersVersionResult2.getLong("col2");
+// }
+// assertTrue(customersCol2 == 2);
+
+ // validate that the table prodtaable2 is present in clickhouse
+ ResultSet chRs = writer.executeQueryWithResultSet("select * from productsnew.prodtable2");
+ boolean recordFound = false;
+ while(chRs.next()) {
+ recordFound = true;
+ assert chRs.getString("col1").equalsIgnoreCase("a");
+ //assert rs.getString("name").equalsIgnoreCase("test");
+ }
+
+ assertTrue(recordFound);
+
+
+ // Execute mysql to rename from prodtabl2 to prodtable3 without database prefix.
+ conn.prepareStatement("rename table prodtable2 to prodtable3").execute();
+
+ Thread.sleep(10000);
+ // Validate on CH that the table prodtable3 is present.
+ chRs = writer.executeQueryWithResultSet("select * from productsnew.prodtable3");
+ boolean prod3RecordFound = false;
+ while(chRs.next()) {
+ prod3RecordFound = true;
+ assert chRs.getString("col1").equalsIgnoreCase("a");
+ //assert rs.getString("name").equalsIgnoreCase("test");
+ }
+ assertTrue(prod3RecordFound);
+ clickHouseDebeziumEmbeddedApplication.getDebeziumEventCapture().engine.close();
+
+ conn.close();
+ // Files.deleteIfExists( tmpFilePath);
+ executorService.shutdown();
+ }
+}
diff --git a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/cdc/Debezium15KTablesLoadIT.java b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/cdc/Debezium15KTablesLoadIT.java
index 6e5948729..0fffe2464 100644
--- a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/cdc/Debezium15KTablesLoadIT.java
+++ b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/cdc/Debezium15KTablesLoadIT.java
@@ -78,8 +78,7 @@ public void testLoadingTablesInSchemaOnlyMode() throws Exception {
ExecutorService executorService = Executors.newFixedThreadPool(1);
executorService.execute(() -> {
try {
- clickHouseDebeziumEmbeddedApplication.start(injector.getInstance(DebeziumRecordParserService.class),
- injector.getInstance(DDLParserService.class), props, false);
+ clickHouseDebeziumEmbeddedApplication.start(injector.getInstance(DebeziumRecordParserService.class), props, false);
DebeziumEmbeddedRestApi.startRestApi(props, injector, clickHouseDebeziumEmbeddedApplication.getDebeziumEventCapture()
, new Properties());
} catch (Exception e) {
diff --git a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/cdc/DebeziumChangeEventCaptureIT.java b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/cdc/DebeziumChangeEventCaptureIT.java
index d49c277cc..f3c8f2853 100644
--- a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/cdc/DebeziumChangeEventCaptureIT.java
+++ b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/cdc/DebeziumChangeEventCaptureIT.java
@@ -123,8 +123,7 @@ public void testIncrementingSequenceNumbers() throws Exception {
ExecutorService executorService = Executors.newFixedThreadPool(1);
executorService.execute(() -> {
try {
- clickHouseDebeziumEmbeddedApplication.start(injector.getInstance(DebeziumRecordParserService.class),
- injector.getInstance(DDLParserService.class), props, false);
+ clickHouseDebeziumEmbeddedApplication.start(injector.getInstance(DebeziumRecordParserService.class) , props, false);
DebeziumEmbeddedRestApi.startRestApi(props, injector, clickHouseDebeziumEmbeddedApplication.getDebeziumEventCapture()
, new Properties());
} catch (Exception e) {
diff --git a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/cdc/DebeziumStorageViewIT.java b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/cdc/DebeziumStorageViewIT.java
index 368aec496..dc5066fb5 100644
--- a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/cdc/DebeziumStorageViewIT.java
+++ b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/cdc/DebeziumStorageViewIT.java
@@ -73,8 +73,7 @@ public void debeziumStorageView() throws Exception {
ExecutorService executorService = Executors.newFixedThreadPool(1);
executorService.execute(() -> {
try {
- clickHouseDebeziumEmbeddedApplication.start(injector.getInstance(DebeziumRecordParserService.class),
- injector.getInstance(DDLParserService.class), props, false);
+ clickHouseDebeziumEmbeddedApplication.start(injector.getInstance(DebeziumRecordParserService.class), props, false);
DebeziumEmbeddedRestApi.startRestApi(props, injector, clickHouseDebeziumEmbeddedApplication.getDebeziumEventCapture()
, new Properties());
} catch (Exception e) {
diff --git a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/cdc/DestinationDBColumnMissingIT.java b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/cdc/DestinationDBColumnMissingIT.java
index 3f4c8d73e..87d347f42 100644
--- a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/cdc/DestinationDBColumnMissingIT.java
+++ b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/cdc/DestinationDBColumnMissingIT.java
@@ -85,8 +85,7 @@ public void testColumnMismatch() throws Exception {
ExecutorService executorService = Executors.newFixedThreadPool(1);
executorService.execute(() -> {
try {
- clickHouseDebeziumEmbeddedApplication.start(injector.getInstance(DebeziumRecordParserService.class),
- injector.getInstance(DDLParserService.class), props, false);
+ clickHouseDebeziumEmbeddedApplication.start(injector.getInstance(DebeziumRecordParserService.class) , props, false);
DebeziumEmbeddedRestApi.startRestApi(props, injector, clickHouseDebeziumEmbeddedApplication.getDebeziumEventCapture()
, new Properties());
} catch (Exception e) {
diff --git a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/cdc/MultipleUpdatesWSameTimestampIT.java b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/cdc/MultipleUpdatesWSameTimestampIT.java
index 14b8f3170..f2a47165e 100644
--- a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/cdc/MultipleUpdatesWSameTimestampIT.java
+++ b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/cdc/MultipleUpdatesWSameTimestampIT.java
@@ -92,8 +92,7 @@ public void testIncrementingSequenceNumberWithUpdates() throws Exception {
ExecutorService executorService = Executors.newFixedThreadPool(1);
executorService.execute(() -> {
try {
- clickHouseDebeziumEmbeddedApplication.start(injector.getInstance(DebeziumRecordParserService.class),
- injector.getInstance(DDLParserService.class), props, false);
+ clickHouseDebeziumEmbeddedApplication.start(injector.getInstance(DebeziumRecordParserService.class) , props, false);
DebeziumEmbeddedRestApi.startRestApi(props, injector, clickHouseDebeziumEmbeddedApplication.getDebeziumEventCapture()
, new Properties());
} catch (Exception e) {
diff --git a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/cdc/SourceDBColumnMissingIT.java b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/cdc/SourceDBColumnMissingIT.java
index 6ef0bd20a..98234cd8a 100644
--- a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/cdc/SourceDBColumnMissingIT.java
+++ b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/cdc/SourceDBColumnMissingIT.java
@@ -84,7 +84,7 @@ public void testColumnMismatch() throws Exception {
executorService.execute(() -> {
try {
clickHouseDebeziumEmbeddedApplication.start(injector.getInstance(DebeziumRecordParserService.class),
- injector.getInstance(DDLParserService.class), props, false);
+ props, false);
DebeziumEmbeddedRestApi.startRestApi(props, injector, clickHouseDebeziumEmbeddedApplication.getDebeziumEventCapture()
, new Properties());
} catch (Exception e) {
diff --git a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/client/SinkConnectorClientRestAPITest.java b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/client/SinkConnectorClientRestAPITest.java
index a82932f36..207340903 100644
--- a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/client/SinkConnectorClientRestAPITest.java
+++ b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/client/SinkConnectorClientRestAPITest.java
@@ -81,8 +81,7 @@ public void testRestClient() throws Exception {
ExecutorService executorService = Executors.newFixedThreadPool(1);
executorService.execute(() -> {
try {
- clickHouseDebeziumEmbeddedApplication.start(injector.getInstance(DebeziumRecordParserService.class),
- injector.getInstance(DDLParserService.class), props, false);
+ clickHouseDebeziumEmbeddedApplication.start(injector.getInstance(DebeziumRecordParserService.class), props, false);
DebeziumEmbeddedRestApi.startRestApi(props, injector, clickHouseDebeziumEmbeddedApplication.getDebeziumEventCapture()
, new Properties());
} catch (Exception e) {
diff --git a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/AlterTableAddColumnIT.java b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/AlterTableAddColumnIT.java
index f038d8971..66d77a177 100644
--- a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/AlterTableAddColumnIT.java
+++ b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/AlterTableAddColumnIT.java
@@ -1,8 +1,10 @@
package com.altinity.clickhouse.debezium.embedded.ddl.parser;
import com.altinity.clickhouse.debezium.embedded.cdc.DebeziumChangeEventCapture;
+import com.altinity.clickhouse.debezium.embedded.config.SinkConnectorLightWeightConfig;
import com.altinity.clickhouse.debezium.embedded.parser.SourceRecordParserService;
import com.altinity.clickhouse.sink.connector.ClickHouseSinkConnectorConfig;
+import com.altinity.clickhouse.sink.connector.ClickHouseSinkConnectorConfigVariables;
import com.altinity.clickhouse.sink.connector.db.BaseDbWriter;
import com.clickhouse.jdbc.ClickHouseConnection;
import org.apache.log4j.BasicConfigurator;
@@ -18,6 +20,7 @@
import java.sql.Connection;
import java.util.HashMap;
import java.util.Map;
+import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicReference;
@@ -50,10 +53,12 @@ public void testAddColumn() throws Exception {
executorService.execute(() -> {
try {
+ Properties properties = getDebeziumProperties();
+ // Add ddl.retry to true
+ //properties.put(SinkConnectorLightWeightConfig.DDL_RETRY, "true");
+
engine.set(new DebeziumChangeEventCapture());
- engine.get().setup(getDebeziumProperties(), new SourceRecordParserService(),
- new MySQLDDLParserService(new ClickHouseSinkConnectorConfig(new HashMap<>()),
- "employees"), false);
+ engine.get().setup(properties, new SourceRecordParserService(), false);
} catch (Exception e) {
throw new RuntimeException(e);
}
diff --git a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/AlterTableChangeColumnIT.java b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/AlterTableChangeColumnIT.java
index e7ccf74f9..33bbf1740 100644
--- a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/AlterTableChangeColumnIT.java
+++ b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/AlterTableChangeColumnIT.java
@@ -48,8 +48,7 @@ public void testChangeColumn() throws Exception {
executorService.execute(() -> {
try {
engine.set(new DebeziumChangeEventCapture());
- engine.get().setup(getDebeziumProperties(), new SourceRecordParserService(),
- new MySQLDDLParserService(new ClickHouseSinkConnectorConfig(new HashMap<>()), "employees"), false);
+ engine.get().setup(getDebeziumProperties(), new SourceRecordParserService(), false);
} catch (Exception e) {
throw new RuntimeException(e);
}
diff --git a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/AlterTableModifyColumnIT.java b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/AlterTableModifyColumnIT.java
index 8ac671fcd..746139f18 100644
--- a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/AlterTableModifyColumnIT.java
+++ b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/AlterTableModifyColumnIT.java
@@ -48,9 +48,7 @@ public void testModifyColumn() throws Exception {
executorService.execute(() -> {
try {
engine.set(new DebeziumChangeEventCapture());
- engine.get().setup(getDebeziumProperties(), new SourceRecordParserService(),
- new MySQLDDLParserService(new ClickHouseSinkConnectorConfig(new HashMap<>()),
- "employees"), false);
+ engine.get().setup(getDebeziumProperties(), new SourceRecordParserService(), false);
} catch (Exception e) {
throw new RuntimeException(e);
}
diff --git a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/AutoCreateTableIT.java b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/AutoCreateTableIT.java
index 898f1bb47..149930ee9 100644
--- a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/AutoCreateTableIT.java
+++ b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/AutoCreateTableIT.java
@@ -78,9 +78,7 @@ public void testAutoCreateTable(String clickHouseServerVersion) throws Exception
try {
engine.set(new DebeziumChangeEventCapture());
- engine.get().setup(ITCommon.getDebeziumProperties(mySqlContainer, clickHouseContainer), new SourceRecordParserService(),
- new MySQLDDLParserService(new ClickHouseSinkConnectorConfig(new HashMap<>()),
- "employees"),false);
+ engine.get().setup(ITCommon.getDebeziumProperties(mySqlContainer, clickHouseContainer), new SourceRecordParserService(), false);
} catch (Exception e) {
throw new RuntimeException(e);
}
diff --git a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/CreateTableDataTypesIT.java b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/CreateTableDataTypesIT.java
index 663e181b6..c739ff918 100644
--- a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/CreateTableDataTypesIT.java
+++ b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/CreateTableDataTypesIT.java
@@ -57,9 +57,7 @@ public void testCreateTable() throws Exception {
props.setProperty("database.include.list", "datatypes");
engine.set(new DebeziumChangeEventCapture());
- engine.get().setup(getDebeziumProperties(), new SourceRecordParserService(),
- new MySQLDDLParserService(new ClickHouseSinkConnectorConfig(new HashMap<>()),
- "employees"), false);
+ engine.get().setup(getDebeziumProperties(), new SourceRecordParserService() , false);
} catch (Exception e) {
throw new RuntimeException(e);
}
diff --git a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/CreateTableDataTypesTimeZoneIT.java b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/CreateTableDataTypesTimeZoneIT.java
index 93bdc1dc6..aa7418596 100644
--- a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/CreateTableDataTypesTimeZoneIT.java
+++ b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/CreateTableDataTypesTimeZoneIT.java
@@ -69,8 +69,8 @@ public void testCreateTable() throws Exception {
props.setProperty("database.include.list", "datatypes");
engine.set(new DebeziumChangeEventCapture());
- engine.get().setup(ITCommon.getDebeziumProperties(mySqlContainer, clickHouseContainer), new SourceRecordParserService(),
- new MySQLDDLParserService(new ClickHouseSinkConnectorConfig(new HashMap<>()), "datatypes"), false);
+ engine.get().setup(ITCommon.getDebeziumProperties(mySqlContainer, clickHouseContainer), new SourceRecordParserService()
+ , false);
} catch (Exception e) {
throw new RuntimeException(e);
}
diff --git a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/DDLBaseIT.java b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/DDLBaseIT.java
index 51409f8e0..412c94828 100644
--- a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/DDLBaseIT.java
+++ b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/DDLBaseIT.java
@@ -2,6 +2,7 @@
import com.altinity.clickhouse.debezium.embedded.ITCommon;
+import com.altinity.clickhouse.debezium.embedded.config.SinkConnectorLightWeightConfig;
import org.apache.log4j.BasicConfigurator;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
@@ -60,8 +61,11 @@ protected Connection connectToMySQL() {
protected Properties getDebeziumProperties() throws Exception {
- return ITCommon.getDebeziumProperties(mySqlContainer, clickHouseContainer);
+ Properties props = ITCommon.getDebeziumProperties(mySqlContainer, clickHouseContainer);
+ props.put(SinkConnectorLightWeightConfig.DDL_RETRY, "true");
+
+ return props;
}
}
diff --git a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/DateTimeWithTimeZoneColumnSchemaOnlyIT.java b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/DateTimeWithTimeZoneColumnSchemaOnlyIT.java
index 785145afa..a9f3ac7e7 100644
--- a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/DateTimeWithTimeZoneColumnSchemaOnlyIT.java
+++ b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/DateTimeWithTimeZoneColumnSchemaOnlyIT.java
@@ -109,8 +109,7 @@ public void testSchemaOnlyMode() throws Exception {
try {
engine.set(new DebeziumChangeEventCapture());
- engine.get().setup(getDebeziumProperties(), new SourceRecordParserService(),
- new MySQLDDLParserService(new ClickHouseSinkConnectorConfig(new HashMap<>()), "employees"), false);
+ engine.get().setup(getDebeziumProperties(), new SourceRecordParserService(), false);
} catch (Exception e) {
throw new RuntimeException(e);
}
diff --git a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/DateTimeWithTimeZoneIT.java b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/DateTimeWithTimeZoneIT.java
index 599f82372..5c6ec0249 100644
--- a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/DateTimeWithTimeZoneIT.java
+++ b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/DateTimeWithTimeZoneIT.java
@@ -70,8 +70,7 @@ public void testCreateTable() throws Exception {
props.setProperty("database.include.list", "datatypes");
engine.set(new DebeziumChangeEventCapture());
- engine.get().setup(ITCommon.getDebeziumProperties(mySqlContainer, clickHouseContainer), new SourceRecordParserService(),
- new MySQLDDLParserService(new ClickHouseSinkConnectorConfig(new HashMap<>()), "datatypes"), false);
+ engine.get().setup(ITCommon.getDebeziumProperties(mySqlContainer, clickHouseContainer), new SourceRecordParserService(), false);
} catch (Exception e) {
throw new RuntimeException(e);
}
diff --git a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/DateTimeWithTimeZoneSchemaOnlyIT.java b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/DateTimeWithTimeZoneSchemaOnlyIT.java
index 6ade4e240..27230aa48 100644
--- a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/DateTimeWithTimeZoneSchemaOnlyIT.java
+++ b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/DateTimeWithTimeZoneSchemaOnlyIT.java
@@ -72,8 +72,7 @@ public void testCreateTable() throws Exception {
props.setProperty("database.include.list", "datatypes");
engine.set(new DebeziumChangeEventCapture());
- engine.get().setup(getDebeziumProperties(), new SourceRecordParserService(),
- new MySQLDDLParserService(new ClickHouseSinkConnectorConfig(new HashMap<>()), "datatypes"), false);
+ engine.get().setup(getDebeziumProperties(), new SourceRecordParserService(), false);
} catch (Exception e) {
throw new RuntimeException(e);
}
diff --git a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/DateTimeWithUserProvidedDifferentTimeZoneIT.java b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/DateTimeWithUserProvidedDifferentTimeZoneIT.java
index 34985b83f..887b04d62 100644
--- a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/DateTimeWithUserProvidedDifferentTimeZoneIT.java
+++ b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/DateTimeWithUserProvidedDifferentTimeZoneIT.java
@@ -70,9 +70,7 @@ public void testCreateTable() throws Exception {
try {
engine.set(new DebeziumChangeEventCapture());
- engine.get().setup(getDebeziumProperties(), new SourceRecordParserService(),
- new MySQLDDLParserService(new ClickHouseSinkConnectorConfig(new HashMap<>()),
- "datatypes"), false);
+ engine.get().setup(getDebeziumProperties(), new SourceRecordParserService(), false);
} catch (Exception e) {
throw new RuntimeException(e);
}
diff --git a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/DateTimeWithUserProvidedTimeZoneSchemaOnlyIT.java b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/DateTimeWithUserProvidedTimeZoneSchemaOnlyIT.java
index cb3bf551e..0920e706a 100644
--- a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/DateTimeWithUserProvidedTimeZoneSchemaOnlyIT.java
+++ b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/DateTimeWithUserProvidedTimeZoneSchemaOnlyIT.java
@@ -74,8 +74,7 @@ public void testCreateTable() throws Exception {
props.setProperty("database.include.list", "datatypes");
engine.set(new DebeziumChangeEventCapture());
- engine.get().setup(getDebeziumProperties(), new SourceRecordParserService(),
- new MySQLDDLParserService(new ClickHouseSinkConnectorConfig(new HashMap<>()), "datatypes"), false);
+ engine.get().setup(getDebeziumProperties(), new SourceRecordParserService(), false);
} catch (Exception e) {
throw new RuntimeException(e);
}
diff --git a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/EmployeesDBIT.java b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/EmployeesDBIT.java
index 760e452d8..e5d8d654e 100644
--- a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/EmployeesDBIT.java
+++ b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/EmployeesDBIT.java
@@ -62,9 +62,7 @@ public void testEmployeesDB() throws Exception {
executorService.execute(() -> {
try {
engine.set(new DebeziumChangeEventCapture());
- engine.get().setup(getDebeziumProperties(), new SourceRecordParserService(),
- new MySQLDDLParserService(new ClickHouseSinkConnectorConfig(new HashMap<>()),
- "employees"), false);
+ engine.get().setup(getDebeziumProperties(), new SourceRecordParserService(), false);
} catch (Exception e) {
throw new RuntimeException(e);
}
diff --git a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/IsDeletedColumnsIT.java b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/IsDeletedColumnsIT.java
index 43fcd20ce..28382c53b 100644
--- a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/IsDeletedColumnsIT.java
+++ b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/IsDeletedColumnsIT.java
@@ -74,8 +74,8 @@ public void testIsDeleted(String clickHouseServerVersion) throws Exception {
try {
engine.set(new DebeziumChangeEventCapture());
- engine.get().setup(ITCommon.getDebeziumProperties(mySqlContainer, clickHouseContainer), new SourceRecordParserService(),
- new MySQLDDLParserService(new ClickHouseSinkConnectorConfig(new HashMap<>()), "datatypes"),false);
+ engine.get().setup(ITCommon.getDebeziumProperties(mySqlContainer, clickHouseContainer), new SourceRecordParserService()
+ ,false);
} catch (Exception e) {
throw new RuntimeException(e);
}
diff --git a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/MultipleDatabaseIT.java b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/MultipleDatabaseIT.java
index 0126b6293..07db78f4f 100644
--- a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/MultipleDatabaseIT.java
+++ b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/MultipleDatabaseIT.java
@@ -78,8 +78,7 @@ public void testMultipleDatabases() throws Exception {
try {
engine.set(new DebeziumChangeEventCapture());
- engine.get().setup(props, new SourceRecordParserService(),
- new MySQLDDLParserService(new ClickHouseSinkConnectorConfig(new HashMap<>()), "test_db"),false);
+ engine.get().setup(props, new SourceRecordParserService(), false);
} catch (Exception e) {
throw new RuntimeException(e);
}
diff --git a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/MySqlDDLParserListenerImplTest.java b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/MySqlDDLParserListenerImplTest.java
index eb688a0f1..442cfc3cf 100644
--- a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/MySqlDDLParserListenerImplTest.java
+++ b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/MySqlDDLParserListenerImplTest.java
@@ -14,6 +14,7 @@
import org.junit.jupiter.params.provider.CsvSource;
import java.util.HashMap;
+import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -49,6 +50,18 @@ public void testCreateTableWithEnum() {
Assert.assertTrue(clickHouseQuery.toString().equalsIgnoreCase("CREATE TABLE employees.employees_predated(emp_no Int32 NOT NULL ,birth_date Date32 NOT NULL ,first_name String NOT NULL ,last_name String NOT NULL ,gender String NOT NULL ,hire_date Date32 NOT NULL ,`_version` UInt64,`is_deleted` UInt8) Engine=ReplacingMergeTree(_version,is_deleted) ORDER BY (emp_no)"));
log.info("Create table " + clickHouseQuery);
}
+
+ @Test
+ public void testCreateTableWithSetDataType() {
+
+ String createQuery = "CREATE TABLE example(options SET('a', 'b', 'c', 'd'))";
+ StringBuffer clickHouseQuery = new StringBuffer();
+
+ mySQLDDLParserService.parseSql(createQuery, "test", clickHouseQuery);
+ Assert.assertTrue("CREATE TABLE employees.example(options Nullable(String),`_version` UInt64,`is_deleted` UInt8) Engine=ReplacingMergeTree(_version,is_deleted) ORDER BY tuple()".equalsIgnoreCase(clickHouseQuery.toString()));
+ ;
+ }
+
@Test
public void testCreateTableWithRangeByColumnsPartition() {
String createQuery = "CREATE TABLE rcx ( a INT, b INT, c CHAR(3), d INT) PARTITION BY RANGE COLUMNS(a,d,c) ( PARTITION p0 VALUES LESS THAN (5,10,'ggg'), PARTITION p1 VALUES LESS THAN (10,20,'mmm'), " +
@@ -214,6 +227,25 @@ public void testAutoCreateTableWithCHTimezoneUpperCaseDateTime() {
log.info("Create table " + clickHouseQuery);
}
+ @Test
+ public void testCreateTableWithReplicatedReplacingMergeTree() {
+
+ StringBuffer clickHouseQuery = new StringBuffer();
+ String createDB = "CREATE TABLE IF NOT EXISTS mysql1.`table_7220f7bd_8c8c_11ef_94db_67ff65f7711d` (id INT NOT NULL,col1 varchar(255), col2 int, PRIMARY KEY (id)) ENGINE = InnoDB))";
+
+ // Set ClickHouse sink connector config to set replicated tables.
+ Map config = new HashMap<>();
+ config.put(ClickHouseSinkConnectorConfigVariables.AUTO_CREATE_TABLES_REPLICATED.toString(), "true");
+ config.put(ClickHouseSinkConnectorConfigVariables.CLICKHOUSE_DATABASE_OVERRIDE_MAP.toString(), "mysql1:ch1");
+
+ ClickHouseSinkConnectorConfig clickHouseSinkConnectorConfig = new ClickHouseSinkConnectorConfig(config);
+ MySQLDDLParserService mySQLDDLParserService = new MySQLDDLParserService(clickHouseSinkConnectorConfig, "ch1");
+ mySQLDDLParserService.parseSql(createDB, "Persons", clickHouseQuery);
+ log.info("Create table " + clickHouseQuery);
+
+ Assert.assertTrue(clickHouseQuery.toString().equalsIgnoreCase("CREATE TABLE if not exists ch1.`table_7220f7bd_8c8c_11ef_94db_67ff65f7711d` ON CLUSTER `{cluster}`(id Int32 NOT NULL ,col1 Nullable(String),col2 Nullable(Int32),`_version` UInt64,`is_deleted` UInt8)Engine=ReplicatedReplacingMergeTree(_version, is_deleted) ORDER BY (id)"));
+
+ }
@Test
public void testCreateTableAutoIncrement() {
StringBuffer clickHouseQuery = new StringBuffer();
@@ -241,6 +273,7 @@ public void testCreateTable() {
log.info("Create table " + clickHouseQuery);
}
+
@Test
public void testCreateTableWithNulLFields() {
StringBuffer clickHouseQuery = new StringBuffer();
@@ -601,6 +634,20 @@ public void renameTable() {
Assert.assertTrue(clickHouseQuery.toString().equalsIgnoreCase("rename table employees.add_test to employees.add_test_old"));
}
+ @Test
+ public void testRenameTableWithDatabaseOverride() {
+ StringBuffer clickHouseQuery = new StringBuffer();
+
+ HashMap props = new HashMap<>();
+ props.put(ClickHouseSinkConnectorConfigVariables.CLICKHOUSE_DATABASE_OVERRIDE_MAP.toString(), "employees:employees2, products:productsnew");
+ ClickHouseSinkConnectorConfig config = new ClickHouseSinkConnectorConfig(props);
+ MySQLDDLParserService mySQLDDLParserService = new MySQLDDLParserService(config, "employees2");
+
+ String sql = "rename table employees.add_test to employees.add_test_old";
+
+ mySQLDDLParserService.parseSql(sql, "table1", clickHouseQuery);
+ Assert.assertTrue(clickHouseQuery.toString().equalsIgnoreCase("rename table employees2.add_test to employees2.add_test_old"));
+ }
@Test
public void testAddIndex() {
StringBuffer clickHouseQuery = new StringBuffer();
@@ -677,7 +724,7 @@ public void renameMultipleTables() {
String sql = "rename /* gh-ost */ table `trade_prod`.`enriched_trade` to `trade_prod`.`_enriched_trade_del`, `trade_prod`.`_enriched_trade_gho` to `trade_prod`.`enriched_trade`\n";
mySQLDDLParserService.parseSql(sql, "", clickHouseQuery);
- Assert.assertTrue(clickHouseQuery.toString().equalsIgnoreCase("RENAME TABLE `trade_prod`.`enriched_trade` to `trade_prod`.`_enriched_trade_del`,`trade_prod`.`_enriched_trade_gho` to `trade_prod`.`enriched_trade`"));
+ Assert.assertTrue(clickHouseQuery.toString().equalsIgnoreCase("RENAME TABLE employees.`enriched_trade` to employees.`_enriched_trade_del`,employees.`_enriched_trade_gho` to employees.`enriched_trade`"));
}
@Test
public void alterTableRenameTable() {
diff --git a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/TableOperationsIT.java b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/TableOperationsIT.java
index b72d2f8dd..2678f81fe 100644
--- a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/TableOperationsIT.java
+++ b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/TableOperationsIT.java
@@ -2,6 +2,7 @@
import com.altinity.clickhouse.debezium.embedded.ITCommon;
import com.altinity.clickhouse.debezium.embedded.cdc.DebeziumChangeEventCapture;
+import com.altinity.clickhouse.debezium.embedded.common.PropertiesHelper;
import com.altinity.clickhouse.debezium.embedded.parser.SourceRecordParserService;
import com.altinity.clickhouse.sink.connector.ClickHouseSinkConnectorConfig;
import com.altinity.clickhouse.sink.connector.db.BaseDbWriter;
@@ -13,6 +14,7 @@
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.CsvSource;
+import org.junit.jupiter.params.provider.ValueSource;
import org.testcontainers.clickhouse.ClickHouseContainer;
import org.testcontainers.containers.MySQLContainer;
import org.testcontainers.containers.wait.strategy.HttpWaitStrategy;
@@ -22,6 +24,7 @@
import java.sql.Connection;
import java.util.HashMap;
import java.util.Map;
+import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicReference;
@@ -60,22 +63,27 @@ public void startContainers() throws InterruptedException {
clickHouseContainer.start();
}
@ParameterizedTest
- @CsvSource({
- "clickhouse/clickhouse-server:latest",
- "clickhouse/clickhouse-server:22.3"
- })
+ @ValueSource(booleans = {
+ false,
+ true}
+ )
@DisplayName("Test that validates DDL(Create, ALTER, RENAME)")
- public void testTableOperations(String clickHouseServerVersion) throws Exception {
+ public void testTableOperations(boolean databaseOverride) throws Exception {
AtomicReference engine = new AtomicReference<>();
+ Properties props = ITCommon.getDebeziumProperties(mySqlContainer, clickHouseContainer);
+ if(databaseOverride) {
+ props.setProperty("clickhouse.database.override.map", "employees:ch_employees, datatypes:ch_datatypes, public:ch_public, project:ch_project");
+ }
+ DebeziumChangeEventCapture debeziumChangeEventCapture = new DebeziumChangeEventCapture();
ExecutorService executorService = Executors.newFixedThreadPool(1);
executorService.execute(() -> {
try {
- engine.set(new DebeziumChangeEventCapture());
- engine.get().setup(ITCommon.getDebeziumProperties(mySqlContainer, clickHouseContainer), new SourceRecordParserService(),
- new MySQLDDLParserService(new ClickHouseSinkConnectorConfig(new HashMap<>()), "employees"),false);
+ engine.set(debeziumChangeEventCapture);
+ ClickHouseSinkConnectorConfig config = new ClickHouseSinkConnectorConfig(PropertiesHelper.toMap(props));
+ engine.get().setup(props, new SourceRecordParserService(),false);
} catch (Exception e) {
throw new RuntimeException(e);
}
@@ -100,11 +108,15 @@ public void testTableOperations(String clickHouseServerVersion) throws Exception
"PARTITIONS 6;").execute();
conn.prepareStatement("create table copied_table like new_table").execute();
conn.prepareStatement("CREATE TABLE rcx ( a INT not null, b INT, c CHAR(3) not null, d INT not null) PARTITION BY RANGE COLUMNS(a,d,c) ( PARTITION p0 VALUES LESS THAN (5,10,'ggg'));").execute();
+
+ // insert a new row to new_table
+ conn.prepareStatement("insert into new_table values('a', 1, 1)").execute();
+
Thread.sleep(10000);
conn.prepareStatement("\n" +
- "CREATE TABLE contacts (id INT AUTO_INCREMENT PRIMARY KEY,\n" +
+ "CREATE TABLE contacts (id INT NOT NULL AUTO_INCREMENT PRIMARY KEY,\n" +
"first_name VARCHAR(50) NOT NULL,\n" +
"last_name VARCHAR(50) NOT NULL,\n" +
"fullname varchar(101) GENERATED ALWAYS AS (CONCAT(first_name,' ',last_name)),\n" +
@@ -131,24 +143,7 @@ public void testTableOperations(String clickHouseServerVersion) throws Exception
// Validate table created with partitions.
String membersResult = writer.executeQuery("show create table members");
-
- if(clickHouseServerVersion.contains("latest")) {
- Assert.assertTrue(membersResult.equalsIgnoreCase("CREATE TABLE employees.members\n" +
- "(\n" +
- " `firstname` String,\n" +
- " `lastname` String,\n" +
- " `username` String,\n" +
- " `email` Nullable(String),\n" +
- " `joined` Date32,\n" +
- " `_version` UInt64,\n" +
- " `is_deleted` UInt8\n" +
- ")\n" +
- "ENGINE = ReplacingMergeTree(_version, is_deleted)\n" +
- "PARTITION BY joined\n" +
- "ORDER BY tuple()\n" +
- "SETTINGS index_granularity = 8192"));
- } else {
- Assert.assertTrue(membersResult.equalsIgnoreCase("CREATE TABLE employees.members\n" +
+ Assert.assertTrue(membersResult.equalsIgnoreCase("CREATE TABLE employees.members\n" +
"(\n" +
" `firstname` String,\n" +
" `lastname` String,\n" +
@@ -162,12 +157,10 @@ public void testTableOperations(String clickHouseServerVersion) throws Exception
"PARTITION BY joined\n" +
"ORDER BY tuple()\n" +
"SETTINGS index_granularity = 8192"));
- }
String rcxResult = writer.executeQuery("show create table rcx");
- if(clickHouseServerVersion.contains("latest")) {
- Assert.assertTrue(rcxResult.equalsIgnoreCase("CREATE TABLE employees.rcx\n" +
+ Assert.assertTrue(rcxResult.equalsIgnoreCase("CREATE TABLE employees.rcx\n" +
"(\n" +
" `a` Int32,\n" +
" `b` Nullable(Int32),\n" +
@@ -180,8 +173,10 @@ public void testTableOperations(String clickHouseServerVersion) throws Exception
"PARTITION BY (a, d, c)\n" +
"ORDER BY tuple()\n" +
"SETTINGS index_granularity = 8192"));
- }
+ Thread.sleep(10000);
+ // Delete offset table.
+ debeziumChangeEventCapture.deleteOffsets(props);
if(engine.get() != null) {
engine.get().stop();
diff --git a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/TruncateTableIT.java b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/TruncateTableIT.java
index 626d68b2b..62518cabe 100644
--- a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/TruncateTableIT.java
+++ b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/TruncateTableIT.java
@@ -71,8 +71,7 @@ public void testIsDeleted() throws Exception {
try {
engine.set(new DebeziumChangeEventCapture());
- engine.get().setup(ITCommon.getDebeziumProperties(mySqlContainer, clickHouseContainer), new SourceRecordParserService(),
- new MySQLDDLParserService(new ClickHouseSinkConnectorConfig(new HashMap<>()), "datatypes"),false);
+ engine.get().setup(ITCommon.getDebeziumProperties(mySqlContainer, clickHouseContainer), new SourceRecordParserService() ,false);
} catch (Exception e) {
throw new RuntimeException(e);
}