Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added support for create table for MySQL Set Data type #881

Open
wants to merge 24 commits into
base: 2.6.0
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
a5ec2af
Added test to cover RENAME table with database.override.map
subkanthi Oct 9, 2024
d4875b0
Upgrade debezium version to 3.0.0.Final
subkanthi Oct 13, 2024
f563469
Merge branch '2.4.0' of github.com:Altinity/clickhouse-sink-connector…
subkanthi Oct 14, 2024
281113c
Merge branch '2.5.0' of github.com:Altinity/clickhouse-sink-connector…
subkanthi Oct 14, 2024
e605355
Add ddl.retry to ddl tests
subkanthi Oct 14, 2024
92dc346
set ddl.retry to true for aall DDL tests
subkanthi Oct 14, 2024
8f0b5da
Merge tag '2.4.0' of github.com:Altinity/clickhouse-sink-connector in…
subkanthi Oct 19, 2024
4864826
Merge branch 'develop' of github.com:Altinity/clickhouse-sink-connect…
subkanthi Oct 19, 2024
b29e5c0
Merge tag '2.4.0' of github.com:Altinity/clickhouse-sink-connector in…
subkanthi Oct 19, 2024
b6bad48
Merge branch 'develop' of github.com:Altinity/clickhouse-sink-connect…
subkanthi Oct 19, 2024
bbe062d
Merge branch '2.5.0' of github.com:Altinity/clickhouse-sink-connector…
subkanthi Oct 19, 2024
f1add2d
Add support for parameterized test to test db override functionality
subkanthi Oct 19, 2024
699ab46
Updated setup signature removed ddlservice dependency
subkanthi Oct 20, 2024
a490017
Fixed rename table for database.override.map
subkanthi Oct 20, 2024
4c91874
Merge tag '2.4.0' of github.com:Altinity/clickhouse-sink-connector in…
subkanthi Oct 20, 2024
a569354
Merge branch '2.4.1' of github.com:Altinity/clickhouse-sink-connector…
subkanthi Oct 20, 2024
2933464
Added assert for DatabaseOverrideIT
subkanthi Oct 20, 2024
6910217
Added Integration test for database override it
subkanthi Oct 20, 2024
25b6192
Fixed database override substitution for create table in RRMT
subkanthi Oct 21, 2024
d18dabc
Fixed DB tests in DatabaseOverrideRRMTIT.java
subkanthi Oct 21, 2024
fb27eab
Reverted back change in DatabaseOverrideIT
subkanthi Oct 22, 2024
57c59fe
Merge pull request #877 from Altinity/database_override_it_rrmt
subkanthi Oct 22, 2024
4b94d0b
Added test for create table for MySQL Set Data type
subkanthi Oct 24, 2024
fb472ec
Add logic to exclude parsing of precision/scale
subkanthi Oct 24, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
<maven.compiler.target>17</maven.compiler.target>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<version.debezium>2.5.0.Beta1</version.debezium>
<version.debezium>2.7.2.Final</version.debezium>
<version.junit>5.9.1</version.junit>
<version.checkstyle.plugin>3.1.1</version.checkstyle.plugin>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
Expand Down
4 changes: 2 additions & 2 deletions sink-connector-lightweight/dependency-reduced-pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-connector-mongodb</artifactId>
<version>2.7.0.Beta2</version>
<version>2.7.2.Final</version>
<scope>test</scope>
<exclusions>
<exclusion>
Expand Down Expand Up @@ -326,7 +326,7 @@
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<version.checkstyle.plugin>3.1.1</version.checkstyle.plugin>
<maven.compiler.target>17</maven.compiler.target>
<version.debezium>2.7.0.Beta2</version.debezium>
<version.debezium>2.7.2.Final</version.debezium>
<quarkus.platform.group-id>io.quarkus.platform</quarkus.platform.group-id>
</properties>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,7 @@ public static void main(String[] args) throws Exception {

setupMonitoringThread(new ClickHouseSinkConnectorConfig(PropertiesHelper.toMap(props)), props);

embeddedApplication.start(injector.getInstance(DebeziumRecordParserService.class),
injector.getInstance(DDLParserService.class), props, false);
embeddedApplication.start(injector.getInstance(DebeziumRecordParserService.class), props, false);

try {
DebeziumEmbeddedRestApi.startRestApi(props, injector, debeziumChangeEventCapture, userProperties);
Expand Down Expand Up @@ -141,8 +140,7 @@ public static CompletableFuture<String> startDebeziumEventLoop(Injector injector

Thread.sleep(500);
// embeddedApplication = new ClickHouseDebeziumEmbeddedApplication();
embeddedApplication.start(injector.getInstance(DebeziumRecordParserService.class),
injector.getInstance(DDLParserService.class), props, true);
embeddedApplication.start(injector.getInstance(DebeziumRecordParserService.class), props, true);
return null;
});

Expand All @@ -151,15 +149,15 @@ public static CompletableFuture<String> startDebeziumEventLoop(Injector injector


public static void start(DebeziumRecordParserService recordParserService,
DDLParserService ddlParserService, Properties props, boolean forceStart) throws Exception {
Properties props, boolean forceStart) throws Exception {

if(forceStart == true) {
// Reload the configuration file.
log.info(String.format("******* Reloading configuration file (%s) from disk ******", configurationFile));
loadPropertiesFile(configurationFile);
}
debeziumChangeEventCapture = new DebeziumChangeEventCapture();
debeziumChangeEventCapture.setup(props, recordParserService, ddlParserService, forceStart);
debeziumChangeEventCapture.setup(props, recordParserService, forceStart);
}

public static void stop() throws IOException {
Expand Down Expand Up @@ -210,8 +208,7 @@ public void run() {
log.info("******* Restarting Event Loop ********");
debeziumChangeEventCapture.stop();
Thread.sleep(3000);
start(injector.getInstance(DebeziumRecordParserService.class),
injector.getInstance(DDLParserService.class), props, true);
start(injector.getInstance(DebeziumRecordParserService.class), props, true);
} catch (IOException e) {
log.error("**** ERROR: Restarting Event Loop ****", e);
throw new RuntimeException(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ public static void startRestApi(Properties props, Injector injector,
try {
debeziumChangeEventCapture.deleteSchemaHistory(config, finalProps1);
} catch (Exception e) {
log.error("Client - Error deleting offsets", e);
log.error("Client - Error deleting schema history", e);
ctx.result(e.toString());
ctx.status(HttpStatus.INTERNAL_SERVER_ERROR);
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import io.debezium.engine.ChangeEvent;
import io.debezium.engine.DebeziumEngine;
import io.debezium.engine.spi.OffsetCommitPolicy;
import io.debezium.relational.history.SchemaHistory;
import io.debezium.storage.jdbc.history.JdbcSchemaHistoryConfig;
import io.debezium.storage.jdbc.offset.JdbcOffsetBackingStoreConfig;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.kafka.connect.data.Field;
Expand Down Expand Up @@ -164,10 +166,20 @@ private void performDDLOperation(String DDL, Properties props, SourceRecord sr,
updateMetrics(DDL, writer);
}

/**
* Function to get the database name from the SourceRecord.
* If the database name is not present in the SourceRecord, then
* the database name is set to "system".
* Also if a database is overridden in the configuration, then
* the database name is set to the overridden database name.
* @param sr
* @return
*/
private String getDatabaseName(SourceRecord sr) {
if (sr != null && sr.key() instanceof Struct) {
String recordDbName = (String) ((Struct) sr.key()).get("databaseName");
if (recordDbName != null && !recordDbName.isEmpty()) {

return recordDbName;
}
}
Expand Down Expand Up @@ -411,8 +423,8 @@ private Pair<String, String> getDebeziumOffsetStorageDatabaseName(Properties pro
* @return
*/
private Pair<String, String> getDebeziumSchemaHistoryDatabaseName(Properties props) {
String tableName = props.getProperty(JdbcOffsetBackingStoreConfig.OFFSET_STORAGE_PREFIX +
JdbcOffsetBackingStoreConfig.PROP_TABLE_NAME.name());
String tableName = props.getProperty(SchemaHistory.CONFIGURATION_FIELD_PREFIX_STRING +
JdbcSchemaHistoryConfig.PROP_TABLE_NAME.name());
return splitTableName(tableName);
}

Expand Down Expand Up @@ -605,10 +617,11 @@ public void deleteSchemaHistory(ClickHouseSinkConnectorConfig config, Properties
databaseName, dbCredentials.getUserName(),
dbCredentials.getPassword(), config, this.conn);

// Get topic.prefix from config
String topicPrefix = config.getString(CommonConnectorConfig.TOPIC_PREFIX.name());
new DebeziumOffsetStorage().deleteSchemaHistoryTable(topicPrefix, tableNameDatabaseName.getRight() + "."
+ tableNameDatabaseName.getLeft(),writer);
// Get topic.prefix from properies
String topicPrefix = props.getProperty(CommonConnectorConfig.TOPIC_PREFIX.name());
// String topicPrefix = config.getString(CommonConnectorConfig.TOPIC_PREFIX.name());
// Jdbc adds the database name to the table name, so we need to remove it
new DebeziumOffsetStorage().deleteSchemaHistoryTable(topicPrefix, tableName, writer);

}
/**
Expand Down Expand Up @@ -770,7 +783,7 @@ public void connectorStopped() {
* @param debeziumRecordParserService
*/
public void setup(Properties props, DebeziumRecordParserService debeziumRecordParserService,
DDLParserService ddlParserService, boolean forceStart) throws IOException, ClassNotFoundException {
boolean forceStart) throws IOException, ClassNotFoundException {

// Check if max queue size was defined by the user.
if(props.getProperty(ClickHouseSinkConnectorConfigVariables.MAX_QUEUE_SIZE.toString()) != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

import com.altinity.clickhouse.sink.connector.db.BaseDbWriter;

import com.clickhouse.logging.Logger;
import com.clickhouse.logging.LoggerFactory;
import io.debezium.storage.jdbc.offset.JdbcOffsetBackingStoreConfig;
import org.json.simple.JSONObject;
import org.json.simple.parser.JSONParser;
Expand Down Expand Up @@ -32,6 +34,7 @@ public class DebeziumOffsetStorage {
public static final String SOURCE_PASSWORD = "source_password";


private static final Logger log = LoggerFactory.getLogger(DebeziumOffsetStorage.class);

public String getOffsetKey(Properties props) {
String connectorName = props.getProperty("name");
Expand Down Expand Up @@ -62,7 +65,8 @@ public void deleteSchemaHistoryTable(String offsetKey,
BaseDbWriter writer) throws SQLException {


String debeziumStorageStatusQuery = String.format("delete from %s where JSONExtractRaw(JSONExtractRaw(history_data,'source'), 'server')='\"%s\"" , tableName, offsetKey);
String debeziumStorageStatusQuery = String.format("delete from `%s` where JSONExtractRaw(JSONExtractRaw(history_data,'source'), 'server')='%s'" , tableName, offsetKey);
log.info("Deleting schema history table query: " + debeziumStorageStatusQuery);
writer.executeQuery(debeziumStorageStatusQuery);
}
/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,16 +54,8 @@ public MySqlDDLParserListenerImpl(BaseDbWriter writer, StringBuffer transformedQ
} catch(Exception e) {
log.error("enterCreateDatabase: Error parsing source to destination database map:" + e.toString());
}
// databaseName might contain backticks. Remove them.
if(databaseName.contains("`")) {
databaseName = databaseName.replace("`", "");
}

if(sourceToDestinationMap.containsKey(databaseName)) {
this.databaseName = sourceToDestinationMap.get(databaseName);
} else {
this.databaseName = databaseName;
}
this.databaseName = overrideDatabaseName(databaseName);

this.query = transformedQuery;
this.tableName = tableName;
Expand All @@ -74,6 +66,23 @@ public MySqlDDLParserListenerImpl(BaseDbWriter writer, StringBuffer transformedQ
this.userProvidedTimeZone = parseTimeZone();
}

/**
* Function to override the database name.
* @param databaseName
* @return
*/
private String overrideDatabaseName(String databaseName) {

// databaseName might contain backticks. Remove them.
if(databaseName.contains("`")) {
databaseName = databaseName.replace("`", "");
}

if(sourceToDestinationMap.containsKey(databaseName)) {
return sourceToDestinationMap.get(databaseName);
}
return databaseName;
}

public ZoneId parseTimeZone() {
String userProvidedTimeZone = config.getString(ClickHouseSinkConnectorConfigVariables
Expand Down Expand Up @@ -102,25 +111,9 @@ public void enterCreateDatabase(MySqlParser.CreateDatabaseContext createDatabase

String databaseName = tree.getText();
if(!databaseName.isEmpty()) {
// Check if the database is overridden
Map<String, String> sourceToDestinationMap = new HashMap<>();

try {
if (this.config.getString(ClickHouseSinkConnectorConfigVariables.CLICKHOUSE_DATABASE_OVERRIDE_MAP.toString()) != null)
sourceToDestinationMap = Utils.parseSourceToDestinationDatabaseMap(this.config.
getString(ClickHouseSinkConnectorConfigVariables.CLICKHOUSE_DATABASE_OVERRIDE_MAP.toString()));
} catch(Exception e) {
log.error("enterCreateDatabase: Error parsing source to destination database map:" + e.toString());
}
// databaseName might contain backticks. Remove them.
if(databaseName.contains("`")) {
databaseName = databaseName.replace("`", "");
}
if(sourceToDestinationMap.containsKey(databaseName)) {
this.query.append(String.format(Constants.CREATE_DATABASE, sourceToDestinationMap.get(databaseName)));
} else {
this.query.append(String.format(Constants.CREATE_DATABASE, databaseName));
}

String overrideDatabaseName = overrideDatabaseName(tree.getText());
this.query.append(String.format(Constants.CREATE_DATABASE, overrideDatabaseName));
}
}
}
Expand Down Expand Up @@ -219,7 +212,10 @@ private Set<String> parseCreateTable(MySqlParser.CreateTableContext ctx, StringB
this.tableName = tree.getText();
// If tableName already includes the database name don't include database name in this.query
if(tableName.contains(".")) {
this.query.append(tableName);
// split tableName into databaseName and tableName
String[] tableNameSplit = tableName.split("\\.");
this.query.append(this.databaseName).append(".").append(tableNameSplit[1]);
//this.query.append(tableName);
} else
this.query.append(databaseName).append(".").append(tree.getText());

Expand Down Expand Up @@ -366,8 +362,7 @@ private String getClickHouseDataType(String parsedDataType, ParseTree colDefTree
MySqlParser.DataTypeContext dtc = ((MySqlParser.ColumnDefinitionContext) colDefTree).dataType();
DataType dt = DataTypeConverter.getDataType(dtc);

if(dt.name().equalsIgnoreCase("ENUM"))
{
if(dt.name().equalsIgnoreCase("ENUM") || dt.name().equalsIgnoreCase("SET")) {
// Dont try to get precision/scale for enums
}
else if(parsedDataType.contains("(") && parsedDataType.contains(")") && parsedDataType.contains(",") ) {
Expand Down Expand Up @@ -736,8 +731,12 @@ public void enterRenameTable(MySqlParser.RenameTableContext renameTableContext)
originalTableName = renameTableContextChildren.get(0).getText();
newTableName = renameTableContextChildren.get(2).getText();
// If the table name already includes the database name dont include it in the query.
if(originalTableName.contains(".")) {
this.query.append(originalTableName).append(" to ").append(newTableName);
if(originalTableName.contains(".") && newTableName.contains(".")) {
// Split database and table name.
String[] databaseAndTableNameArray = originalTableName.split("\\.");
String[] newDatabaseAndTableNameArray = newTableName.split("\\.");
this.query.append(this.databaseName).append(".").append(databaseAndTableNameArray[1]).append(" to ").
append(this.databaseName).append(".").append(newDatabaseAndTableNameArray[1]);
} else
this.query.append(databaseName).append(".").append(originalTableName).append(" to ").
append(databaseName).append(".").append(newTableName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,9 +81,7 @@ public void testDecoderBufsPlugin() throws Exception {
try {

engine.set(new DebeziumChangeEventCapture());
engine.get().setup(getProperties(), new SourceRecordParserService(),
new MySQLDDLParserService(new ClickHouseSinkConnectorConfig(new HashMap<>()),
"employees"), false);
engine.get().setup(getProperties(), new SourceRecordParserService(), false);
} catch (Exception e) {
throw new RuntimeException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,7 @@ public void testPgOutputPlugin() throws Exception {
try {

engine.set(new DebeziumChangeEventCapture());
engine.get().setup(getProperties(), new SourceRecordParserService(),
new MySQLDDLParserService(new ClickHouseSinkConnectorConfig(new HashMap<>()), "system"), false);
engine.get().setup(getProperties(), new SourceRecordParserService(), false);
} catch (Exception e) {
throw new RuntimeException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ static public Properties getDebeziumProperties(MySQLContainer mySqlContainer, Cl
defaultProps.setProperty("clickhouse.server.port", String.valueOf(clickHouseContainer.getFirstMappedPort()));
defaultProps.setProperty("clickhouse.server.user", clickHouseContainer.getUsername());
defaultProps.setProperty("clickhouse.server.password", clickHouseContainer.getPassword());
//defaultProps.setProperty("ddl.retry", "true");

defaultProps.setProperty("offset.storage.jdbc.url", String.format("jdbc:clickhouse://%s:%s",
clickHouseContainer.getHost(), clickHouseContainer.getFirstMappedPort()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,7 @@
try {

engine.set(new DebeziumChangeEventCapture());
engine.get().setup(props, new SourceRecordParserService(),
new MySQLDDLParserService(new ClickHouseSinkConnectorConfig(new HashMap<>()), "test_db"),false);
engine.get().setup(props, new SourceRecordParserService(),false);
} catch (Exception e) {
throw new RuntimeException(e);
}
Expand Down Expand Up @@ -136,7 +135,7 @@
// query clickhouse connection and get data for test_table1 and test_table2


ResultSet rs = writer.executeQueryWithResultSet("SELECT * FROM employees.audience");

Check failure on line 138 in sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/MariaDBIT.java

View workflow job for this annotation

GitHub Actions / JUnit Test Report

MariaDBIT.testMultipleDatabases

Code: 60. DB::Exception: Unknown table expression identifier 'employees.audience' in scope SELECT * FROM employees.audience. (UNKNOWN_TABLE) (version 24.9.2.42 (official build)) , server ClickHouseNode [uri=http://localhost:32852/system, options={custom_settings=allow_experimental_object_type=1,insert_allow_materialized_columns=1,client_name=Client_1}]@376799774
Raw output
java.sql.BatchUpdateException: 
Code: 60. DB::Exception: Unknown table expression identifier 'employees.audience' in scope SELECT * FROM employees.audience. (UNKNOWN_TABLE) (version 24.9.2.42 (official build))
, server ClickHouseNode [uri=http://localhost:32852/system, options={custom_settings=allow_experimental_object_type=1,insert_allow_materialized_columns=1,client_name=Client_1}]@376799774
	at com.altinity.clickhouse.debezium.embedded.MariaDBIT.testMultipleDatabases(MariaDBIT.java:138)
// Validate the data
boolean recordFound = false;
while(rs.next()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,9 +75,7 @@ public void testMySQLGeneratedColumns() throws Exception {
Properties props = getDebeziumProperties(mySqlContainer, clickHouseContainer);

engine.set(new DebeziumChangeEventCapture());
engine.get().setup(props, new SourceRecordParserService(),
new MySQLDDLParserService(new ClickHouseSinkConnectorConfig(new HashMap<>()),
"employees"), false);
engine.get().setup(props, new SourceRecordParserService(), false);
} catch (Exception e) {
throw new RuntimeException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,7 @@
try {

engine.set(new DebeziumChangeEventCapture());
engine.get().setup(props, new SourceRecordParserService(),
new MySQLDDLParserService(new ClickHouseSinkConnectorConfig(new HashMap<>()), "test_db"),false);
engine.get().setup(props, new SourceRecordParserService(), false);
} catch (Exception e) {
throw new RuntimeException(e);
}
Expand Down Expand Up @@ -129,7 +128,7 @@
// query clickhouse connection and get data for test_table1 and test_table2


ResultSet rs = writer.executeQueryWithResultSet("SELECT * FROM employees.audience");

Check failure on line 131 in sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/MySQLJsonIT.java

View workflow job for this annotation

GitHub Actions / JUnit Test Report

MySQLJsonIT.testMultipleDatabases

Code: 60. DB::Exception: Unknown table expression identifier 'employees.audience' in scope SELECT * FROM employees.audience. (UNKNOWN_TABLE) (version 24.9.2.42 (official build)) , server ClickHouseNode [uri=http://localhost:32850/system, options={custom_settings=allow_experimental_object_type=1,insert_allow_materialized_columns=1,client_name=Client_1}]@769826784
Raw output
java.sql.BatchUpdateException: 
Code: 60. DB::Exception: Unknown table expression identifier 'employees.audience' in scope SELECT * FROM employees.audience. (UNKNOWN_TABLE) (version 24.9.2.42 (official build))
, server ClickHouseNode [uri=http://localhost:32850/system, options={custom_settings=allow_experimental_object_type=1,insert_allow_materialized_columns=1,client_name=Client_1}]@769826784
	at com.altinity.clickhouse.debezium.embedded.MySQLJsonIT.testMultipleDatabases(MySQLJsonIT.java:131)
// Validate the data
boolean recordFound = false;
while(rs.next()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,8 +93,7 @@ public void testAutoCreateTable(String clickHouseServerVersion) throws Exception
try {

engine.set(new DebeziumChangeEventCapture());
engine.get().setup(ITCommon.getDebeziumPropertiesForSchemaOnly(mySqlContainer, clickHouseContainer), new SourceRecordParserService(),
new MySQLDDLParserService(new ClickHouseSinkConnectorConfig(new HashMap<>()), "datatypes"),false);
engine.get().setup(ITCommon.getDebeziumPropertiesForSchemaOnly(mySqlContainer, clickHouseContainer), new SourceRecordParserService(), false);
} catch (Exception e) {
throw new RuntimeException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,7 @@ public void testDecoderBufsPlugin() throws Exception {
try {

engine.set(new DebeziumChangeEventCapture());
engine.get().setup(getProperties(), new SourceRecordParserService(),
new MySQLDDLParserService(new ClickHouseSinkConnectorConfig(new HashMap<>()), "employees"), false);
engine.get().setup(getProperties(), new SourceRecordParserService(), false);
} catch (Exception e) {
throw new RuntimeException(e);
}
Expand Down
Loading
Loading