From 85fd98c443e7a6a9eccf08e4828ae8a4dcd3abf6 Mon Sep 17 00:00:00 2001 From: Kanthi Subramanian Date: Sun, 8 Sep 2024 18:50:10 -0400 Subject: [PATCH 1/2] Added support for SQL server --- .../docker/config_sql_server.yml | 48 ++++++++++ .../docker/data_sql_server/MyTable.csv | 3 + .../docker/data_sql_server/MyTable.sql | 8 ++ .../docker/data_sql_server/entrypoint.sh | 28 ++++++ .../docker/data_sql_server/init.sql | 4 + .../docker/docker-compose-sql-server.yaml | 90 ++++++++++++++++++ sink-connector-lightweight/pom.xml | 8 ++ .../src/test/resources/ssl/README.md | 57 +++++++++++ .../src/test/resources/ssl/truststore.ks | Bin 0 -> 1158 bytes 9 files changed, 246 insertions(+) create mode 100644 sink-connector-lightweight/docker/config_sql_server.yml create mode 100644 sink-connector-lightweight/docker/data_sql_server/MyTable.csv create mode 100644 sink-connector-lightweight/docker/data_sql_server/MyTable.sql create mode 100644 sink-connector-lightweight/docker/data_sql_server/entrypoint.sh create mode 100644 sink-connector-lightweight/docker/data_sql_server/init.sql create mode 100644 sink-connector-lightweight/docker/docker-compose-sql-server.yaml create mode 100644 sink-connector-lightweight/src/test/resources/ssl/README.md create mode 100644 sink-connector-lightweight/src/test/resources/ssl/truststore.ks diff --git a/sink-connector-lightweight/docker/config_sql_server.yml b/sink-connector-lightweight/docker/config_sql_server.yml new file mode 100644 index 000000000..17d15db7f --- /dev/null +++ b/sink-connector-lightweight/docker/config_sql_server.yml @@ -0,0 +1,48 @@ +name: "debezium-embedded-sql-server" +database.hostname: "sql-server-db" +database.port: "1433" +database.user: "sa" +database.password: "Root1234$$" +database.names: "Prime" +table.include.list: "MyTable" +clickhouse.server.url: "clickhouse" +clickhouse.server.user: "root" +clickhouse.server.pass: "root" +clickhouse.server.port: "8123" +clickhouse.server.database: "test" +database.allowPublicKeyRetrieval: "true" +snapshot.mode: "initial" +offset.flush.interval.ms: 5000 +connector.class: "io.debezium.connector.sqlserver.SqlServerConnector" +offset.storage: "io.debezium.storage.jdbc.offset.JdbcOffsetBackingStore" +offset.storage.offset.storage.jdbc.offset.table.name: "altinity_sink_connector.replica_source_info" +offset.storage.jdbc.url: "jdbc:clickhouse://clickhouse:8123/altinity_sink_connector" +offset.storage.jdbc.user: "root" +offset.storage.jdbc.password: "root" +offset.storage.offset.storage.jdbc.offset.table.ddl: "CREATE TABLE if not exists %s +( + `id` String, + `offset_key` String, + `offset_val` String, + `record_insert_ts` DateTime, + `record_insert_seq` UInt64, + `_version` UInt64 MATERIALIZED toUnixTimestamp64Nano(now64(9)) +) +ENGINE = ReplacingMergeTree(_version) +ORDER BY id +SETTINGS index_granularity = 8198" +offset.storage.offset.storage.jdbc.offset.table.delete: "delete from %s where 1=1" +schema.history.internal: "io.debezium.storage.jdbc.history.JdbcSchemaHistory" +schema.history.internal.jdbc.url: "jdbc:clickhouse://clickhouse:8123/altinity_sink_connector" +schema.history.internal.jdbc.user: "root" +schema.history.internal.jdbc.password: "root" +schema.history.internal.jdbc.schema.history.table.ddl: "CREATE TABLE if not exists %s +(`id` VARCHAR(36) NOT NULL, `history_data` VARCHAR(65000), `history_data_seq` INTEGER, `record_insert_ts` TIMESTAMP NOT NULL, `record_insert_seq` INTEGER NOT NULL) ENGINE=ReplacingMergeTree(record_insert_seq) order by id" + +schema.history.internal.jdbc.schema.history.table.name: "altinity_sink_connector.replicate_schema_history" +enable.snapshot.ddl: "true" +auto.create.tables: "true" +database.dbname: "public" +database.ssl.truststore: "${project.basedir}/src/test/resources/ssl" +database.ssl.truststore.password: "debezium" +database.trustServerCertificate: "true" \ No newline at end of file diff --git a/sink-connector-lightweight/docker/data_sql_server/MyTable.csv b/sink-connector-lightweight/docker/data_sql_server/MyTable.csv new file mode 100644 index 000000000..4b02e279f --- /dev/null +++ b/sink-connector-lightweight/docker/data_sql_server/MyTable.csv @@ -0,0 +1,3 @@ +id,value +1,yes +2,it works:) \ No newline at end of file diff --git a/sink-connector-lightweight/docker/data_sql_server/MyTable.sql b/sink-connector-lightweight/docker/data_sql_server/MyTable.sql new file mode 100644 index 000000000..bb0cfa830 --- /dev/null +++ b/sink-connector-lightweight/docker/data_sql_server/MyTable.sql @@ -0,0 +1,8 @@ +USE Prime; +GO + +CREATE TABLE MyTable( + Id nvarchar(max), + Value nvarchar(max) +); +GO \ No newline at end of file diff --git a/sink-connector-lightweight/docker/data_sql_server/entrypoint.sh b/sink-connector-lightweight/docker/data_sql_server/entrypoint.sh new file mode 100644 index 000000000..12e1155e6 --- /dev/null +++ b/sink-connector-lightweight/docker/data_sql_server/entrypoint.sh @@ -0,0 +1,28 @@ +#!/bin/bash +database=Prime +wait_time=15s +password=Root1234$$ + +# wait for SQL Server to come up +echo importing data will start in $wait_time... +sleep $wait_time +echo importing data... + +# run the init script to create the DB and the tables in /table +/opt/mssql-tools/bin/sqlcmd -S 0.0.0.0 -U sa -P $password -i ./init.sql + +for entry in "*.sql" +do + echo executing $entry + /opt/mssql-tools/bin/sqlcmd -S 0.0.0.0 -U sa -P $password -i $entry +done + +#import the data from the csv files +for entry in "*.csv" +do + # i.e: transform /data/MyTable.csv to MyTable + shortname=$(echo $entry | cut -f 1 -d '.' | cut -f 2 -d '/') + tableName=$database.dbo.$shortname + echo importing $tableName from $entry + /opt/mssql-tools/bin/bcp $tableName in $entry -c -t',' -F 2 -S 0.0.0.0 -U sa -P $password +done \ No newline at end of file diff --git a/sink-connector-lightweight/docker/data_sql_server/init.sql b/sink-connector-lightweight/docker/data_sql_server/init.sql new file mode 100644 index 000000000..0d0e7fdc0 --- /dev/null +++ b/sink-connector-lightweight/docker/data_sql_server/init.sql @@ -0,0 +1,4 @@ +DROP DATABASE Prime + +CREATE DATABASE Prime; +GO \ No newline at end of file diff --git a/sink-connector-lightweight/docker/docker-compose-sql-server.yaml b/sink-connector-lightweight/docker/docker-compose-sql-server.yaml new file mode 100644 index 000000000..1f92fa95a --- /dev/null +++ b/sink-connector-lightweight/docker/docker-compose-sql-server.yaml @@ -0,0 +1,90 @@ +version: "3.4" + +# Ubuntu , set this for redpanda to start +# https://sort.veritas.com/public/documents/HSO/2.0/linux/productguides/html/hfo_admin_ubuntu/ch04s03.htm + +# Clickhouse Table Schema +# create table test(id int, message String) ENGINE=MergeTree() PRIMARY KEY id; + +services: + + sql-server-db: + container_name: sql-server-db + image: mcr.microsoft.com/mssql/server:2022-latest + #command: sh -c ' chmod +x /tmp/entrypoint.sh; /tmp/./entrypoint.sh & /opt/mssql/bin/sqlservr;' + ports: + - "1433:1433" + environment: + MSSQL_SA_PASSWORD: "Root1234$$" + ACCEPT_EULA: "Y" + volumes: + - ./data_sqlserver:/tmp + + clickhouse: + # clickhouse-client --host=127.0.0.1 --port=9000 --user=root --password=root --database=test + container_name: clickhouse + image: clickhouse/clickhouse-server:latest + restart: "no" + ports: + - "8123:8123" + - "9000:9000" + environment: + - CLICKHOUSE_USER=root + - CLICKHOUSE_PASSWORD=root + - CLICKHOUSE_DB=test + - CLICKHOUSE_DEFAULT_ACCESS_MANAGEMENT=0 + ulimits: + nofile: + soft: "262144" + hard: "262144" + volumes: + #- ../sql/init_ch.sql:/docker-entrypoint-initdb.d/init_clickhouse.sql + - ../clickhouse/users.xml:/etc/clickhouse-server/users.xml + + debezium-embedded: + image: registry.gitlab.com/altinity-public/container-images/clickhouse_debezium_embedded:${SINK_LIGHTWEIGHT_VERSION} + # build: + # context: ../ + restart: "no" + ports: + - "8083:8083" + - "5005:5005" + - "7000:7000" + depends_on: + - clickhouse + extra_hosts: + - "host.docker.internal:host-gateway" + volumes: + #- ./data:/data + - ./config_sql_server.yml:/config.yml + + ### MONITORING #### + prometheus: + container_name: prometheus + image: bitnami/prometheus:2.36.0 + restart: "no" + ports: + - "9090:9090" + volumes: + - ./config/prometheus.yml:/opt/bitnami/prometheus/conf/prometheus.yml + + grafana: + build: + context: ./config/grafana + args: + GRAFANA_VERSION: latest + #container_name: grafana + #image: grafana/grafana + restart: "no" + #volumes: + # - ../config/grafana/dashboards:/etc/grafana/provisioning/dashboards + ports: + - "3000:3000" + environment: + - DS_PROMETHEUS=prometheus + - GF_USERS_DEFAULT_THEME=light + - GF_PLUGINS_ALLOW_LOADING_UNSIGNED_PLUGINS=vertamedia-clickhouse-datasource,grafana-clickhouse-datasource + - GF_INSTALL_PLUGINS=vertamedia-clickhouse-datasource,grafana-clickhouse-datasource + depends_on: + - prometheus + ## END OF MONITORING ### \ No newline at end of file diff --git a/sink-connector-lightweight/pom.xml b/sink-connector-lightweight/pom.xml index 250bbd953..154aec166 100644 --- a/sink-connector-lightweight/pom.xml +++ b/sink-connector-lightweight/pom.xml @@ -113,6 +113,11 @@ debezium-connector-postgres ${version.debezium} + + io.debezium + debezium-connector-sqlserver + ${version.debezium} + org.postgresql postgresql @@ -539,6 +544,9 @@ *:* org/apache/log4j/** + META-INF/*.SF + META-INF/*.DSA + META-INF/*.RSA diff --git a/sink-connector-lightweight/src/test/resources/ssl/README.md b/sink-connector-lightweight/src/test/resources/ssl/README.md new file mode 100644 index 000000000..4ba59537b --- /dev/null +++ b/sink-connector-lightweight/src/test/resources/ssl/README.md @@ -0,0 +1,57 @@ +This directory contains the truststore (used for validating DB server certificate). + +The files are generated based on the certificates in src/test/docker, which are generated following the +guidelines outlined in https://docs.microsoft.com/en-us/sql/linux/sql-server-linux-docker-container-security?view=sql-server-ver16. + +To generate the server certificate and key: + +``` +openssl req -x509 -nodes -newkey rsa:2048 -subj '/CN=localhost' -keyout mssql.key -out mssql.pem -days 3650 +``` + +This will output two files, `mssql.key` and `mssql.pem`. + +These files are mounted to the SQL Server docker container and therefore the correct permissions must be set +on these files so that they can be read properly by SQL Server's database process. To set the right permissions, +you need to run the following: + +``` +chmod 444 mssql.key +chmod 444 mssql.pem +``` + +Note, we explicitly use `444` rather than `440` since the volume is mounted as root but the SQL Server database +is started by the `mssql` user, this allows the key and certificate to be read. + +In addition, a special `mssql.conf` file will also be mounted in order for Microsoft SQL Server to know where +to read the database certificates from. This configuration file will be mounted automatically to +`/var/opt/mssql/mssql.conf`. The contents of this file is as follows: + +``` +[network] +tlscert = /etc/ssl/debezium/certs/mssql.pem +tlskey = /etc/ssl/debezium/private/mssql.key +tlsprotocols = 1.2 +forceencryption = 1 +``` + +This configuration enforces TLS 1.2 and encryption in order to operate with the SQL Server instance. +If the client does not support encryption or cannot negotiate TLS 1.2, the client connection will be rejected. + +Finally, the truststore is generated as follows: + +``` +keytool -import -v -trustcacerts -alias localhost -file ../../docker/mssql.pem -keystore truststore.ks -storepass debezium -noprompt +``` + +This imports the `mssql.pem` public certificate into the truststore that we mount into the container and +this truststore will be configured as part of the SQL Server connector's configuration using: + +``` +database.ssl.truststore=${project.basedir}/src/test/resources/ssl +database.ssl.truststore.password=debezium +database.trustServerCertificate=true +``` + +We specifically set `trustServerCertificate` because this certificate is self-signed. +This certificate is also valid for 10 years from August 9th, 2022, so expires August 9th, 2032. \ No newline at end of file diff --git a/sink-connector-lightweight/src/test/resources/ssl/truststore.ks b/sink-connector-lightweight/src/test/resources/ssl/truststore.ks new file mode 100644 index 0000000000000000000000000000000000000000..8c5e7920187dc4507acd66b9fd13a21989b621c7 GIT binary patch literal 1158 zcmV;11bO=~f&_vB0Ru3C1S|##Duzgg_YDCD0ic2e9Rz{|88Ctb6)=JX5e5k=hDe6@ z4FLxRpn?PfFoFaA0s#Opf&=#k2`Yw2hW8Bt2LUi<1_>&LNQU+thDZTr0|Wso1Q4N=An*)y$}|L;Y1C4c5omyd1AvtT|3jLv@eN1vcs$-^yvsPW1AASzN|NUiWK z=U=K^xdf;O?K%Q455S0c5!80i@PrIpU@I_kC!2c2*F69nM22Vb5PyN`aukx7QQ|=5G?2l{)3>Gb_`C)!COGlao2&sj$lI$J z`DjK-o^en1l?sV+z^57&)(VfbEyJQhvpsjg#x@vCTQUHj)n*uUj9JS{wkp`PK)M48 zI^ZMR)*~azD3i}*-SJH78<)bpXI=O3)U^XCdkY$kzXw=(1*3Cu4@qq0Lx_xb~V^A7L_=TMD0Y9iw^q z?tw3Fx{v$*;Mi7$5+%}^JEXW*FRfZR?kxHl?HXoJ^nDVluV8fjfpFvYg=JsE)lhVx z=|mhgomU(PPC&68|M|ORn|l{UaH0tQ@|6><>R?ru1C}xRp~NFQ5=KQTp3JviHuT1D zJ{{5K7{38LlI)AB8B5Nuu*^o+qjwW?S$8L<%@O@m)58Rv}dG-wT|O!Jhb;hAFtRdYR!<{x{*$@}II= z<;8XU0s>MSRnGzr4G9cu>rmRvpLB>9p*6f;MCddq(P+2k@-8e_d+_Tmt7)aDqI>t) z9V6#HxurFx2tu85`dy37x3Whcm%UEp9b$)PMUopTDtv;0$XAy21@r;4#s>ah7|LYV z{mz`TOW$+BwI!2vTb{fMuJ5AlQzGOtcT=m>!=BVB!OV<-w%lL@fx`rq4!bZ-FflL< z1_@w>NC9O71OfpC00bb=Yx>bxDn^6a_A*pUADCR|LH_4=PBOQBe;7KG*a9&G6o_6t Y$+EvLFiZ`**|IJ`gFFga=mG*K5L&Pg&;S4c literal 0 HcmV?d00001 From 07567c02f595b655fed9248eb9c57f59b6d788c0 Mon Sep 17 00:00:00 2001 From: Kanthi Subramanian Date: Tue, 24 Sep 2024 19:36:06 -0400 Subject: [PATCH 2/2] Fixed SQL server integration test --- .../docker/docker-compose-sql-server.yaml | 82 +++++------ sink-connector-lightweight/pom.xml | 6 + .../debezium/embedded/SQLServerIT.java | 127 ++++++++++++++++++ 3 files changed, 163 insertions(+), 52 deletions(-) create mode 100644 sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/SQLServerIT.java diff --git a/sink-connector-lightweight/docker/docker-compose-sql-server.yaml b/sink-connector-lightweight/docker/docker-compose-sql-server.yaml index 1f92fa95a..115c51a8d 100644 --- a/sink-connector-lightweight/docker/docker-compose-sql-server.yaml +++ b/sink-connector-lightweight/docker/docker-compose-sql-server.yaml @@ -21,70 +21,48 @@ services: - ./data_sqlserver:/tmp clickhouse: - # clickhouse-client --host=127.0.0.1 --port=9000 --user=root --password=root --database=test - container_name: clickhouse - image: clickhouse/clickhouse-server:latest - restart: "no" - ports: - - "8123:8123" - - "9000:9000" - environment: - - CLICKHOUSE_USER=root - - CLICKHOUSE_PASSWORD=root - - CLICKHOUSE_DB=test - - CLICKHOUSE_DEFAULT_ACCESS_MANAGEMENT=0 - ulimits: - nofile: - soft: "262144" - hard: "262144" - volumes: - #- ../sql/init_ch.sql:/docker-entrypoint-initdb.d/init_clickhouse.sql - - ../clickhouse/users.xml:/etc/clickhouse-server/users.xml + extends: + file: clickhouse-service.yml + service: clickhouse + depends_on: + zookeeper: + condition: service_healthy - debezium-embedded: - image: registry.gitlab.com/altinity-public/container-images/clickhouse_debezium_embedded:${SINK_LIGHTWEIGHT_VERSION} - # build: - # context: ../ - restart: "no" - ports: - - "8083:8083" - - "5005:5005" - - "7000:7000" + zookeeper: + extends: + file: zookeeper-service.yml + service: zookeeper + + clickhouse-sink-connector-lt: + extends: + file: clickhouse-sink-connector-lt-service.yml + service: clickhouse-sink-connector-lt depends_on: - clickhouse extra_hosts: - "host.docker.internal:host-gateway" + environment: + JAVA_OPTS: > + -Xmx5G + -Xms128m volumes: - #- ./data:/data - ./config_sql_server.yml:/config.yml ### MONITORING #### prometheus: - container_name: prometheus - image: bitnami/prometheus:2.36.0 - restart: "no" - ports: - - "9090:9090" - volumes: - - ./config/prometheus.yml:/opt/bitnami/prometheus/conf/prometheus.yml + extends: + file: prometheus-service.yml + service: prometheus + grafana: - build: - context: ./config/grafana - args: - GRAFANA_VERSION: latest - #container_name: grafana - #image: grafana/grafana - restart: "no" - #volumes: - # - ../config/grafana/dashboards:/etc/grafana/provisioning/dashboards - ports: - - "3000:3000" - environment: - - DS_PROMETHEUS=prometheus - - GF_USERS_DEFAULT_THEME=light - - GF_PLUGINS_ALLOW_LOADING_UNSIGNED_PLUGINS=vertamedia-clickhouse-datasource,grafana-clickhouse-datasource - - GF_INSTALL_PLUGINS=vertamedia-clickhouse-datasource,grafana-clickhouse-datasource + extends: + file: grafana-service.yml + service: grafana + volumes: + - ./config/grafana/config/dashboard.yml:/etc/grafana/provisioning/dashboards/dashboard.yml + - ./config/grafana/config/datasource.yml:/etc/grafana/provisioning/datasources/datasource.yml + - ./config/grafana/config/altinity_sink_connector.json:/var/lib/grafana/dashboards/altinity_sink_connector.json depends_on: - prometheus ## END OF MONITORING ### \ No newline at end of file diff --git a/sink-connector-lightweight/pom.xml b/sink-connector-lightweight/pom.xml index 154aec166..af7eb7574 100644 --- a/sink-connector-lightweight/pom.xml +++ b/sink-connector-lightweight/pom.xml @@ -279,6 +279,12 @@ ${version.testcontainers} test + + org.testcontainers + mssqlserver + ${version.testcontainers} + test + diff --git a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/SQLServerIT.java b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/SQLServerIT.java new file mode 100644 index 000000000..76d2e7fc4 --- /dev/null +++ b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/SQLServerIT.java @@ -0,0 +1,127 @@ +package com.altinity.clickhouse.debezium.embedded; + + +import com.altinity.clickhouse.debezium.embedded.cdc.DebeziumChangeEventCapture; +import com.altinity.clickhouse.debezium.embedded.ddl.parser.MySQLDDLParserService; +import com.altinity.clickhouse.debezium.embedded.parser.SourceRecordParserService; +import com.altinity.clickhouse.sink.connector.ClickHouseSinkConnectorConfig; +import com.altinity.clickhouse.sink.connector.db.BaseDbWriter; +import com.clickhouse.jdbc.ClickHouseConnection; +import org.junit.Assert; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Test; +import org.testcontainers.clickhouse.ClickHouseContainer; +import org.testcontainers.containers.MSSQLServerContainer; +import org.testcontainers.containers.Network; +import org.testcontainers.containers.PostgreSQLContainer; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.junit.jupiter.Testcontainers; +import org.testcontainers.utility.DockerImageName; + +import java.sql.ResultSet; +import java.util.HashMap; +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicReference; + +import static com.altinity.clickhouse.debezium.embedded.PostgresProperties.getDefaultProperties; + +@Testcontainers +public class SQLServerIT { + + @Container + public static org.testcontainers.clickhouse.ClickHouseContainer clickHouseContainer = + new ClickHouseContainer(DockerImageName.parse("clickhouse/clickhouse-server:latest") + .asCompatibleSubstituteFor("clickhouse")) + .withInitScript("init_clickhouse_it.sql") + .withUsername("ch_user") + .withPassword("password") + .withExposedPorts(8123); + + @Container + public static MSSQLServerContainer sqlServerContainer = new MSSQLServerContainer<> + (DockerImageName.parse("mcr.microsoft.com/mssql/server:2019-latest") + .asCompatibleSubstituteFor("sqlserver")) + .withInitScript("init_sqlserver.sql") + .withUsername("sa") + .withPassword("Password!") + .withDatabaseName("employees") + .withExposedPorts(1433); + + + public Properties getProperties() throws Exception { + + Properties properties = new Properties(); +// Properties properties = getDefaultProperties(postgreSQLContainer, clickHouseContainer); +// properties.put("plugin.name", "decoderbufs"); +// properties.put("plugin.path", "/"); +// properties.put("table.include.list", "public.tm"); +// properties.put("slot.max.retries", "6"); +// properties.put("slot.retry.delay.ms", "5000"); +// properties.put("database.allowPublicKeyRetrieval", "true"); +// properties.put("table.include.list", "public.tm,public.tm2"); + + return properties; + } + + @Test + @DisplayName("Integration Test - Validates PostgreSQL replication when the plugin is set to DecoderBufs") + public void testDecoderBufsPlugin() throws Exception { + Network network = Network.newNetwork(); + + sqlServerContainer.withNetwork(network).start(); + clickHouseContainer.withNetwork(network).start(); + Thread.sleep(10000); + + org.testcontainers.Testcontainers.exposeHostPorts(sqlServerContainer.getFirstMappedPort()); + AtomicReference engine = new AtomicReference<>(); + + ExecutorService executorService = Executors.newFixedThreadPool(1); + executorService.execute(() -> { + try { + + engine.set(new DebeziumChangeEventCapture()); + engine.get().setup(getProperties(), new SourceRecordParserService(), + new MySQLDDLParserService(new ClickHouseSinkConnectorConfig(new HashMap<>()), + "employees"), false); + } catch (Exception e) { + throw new RuntimeException(e); + } + }); + + Thread.sleep(10000);// + Thread.sleep(50000); + + String jdbcUrl = BaseDbWriter.getConnectionString(clickHouseContainer.getHost(), clickHouseContainer.getFirstMappedPort(), + "public"); + ClickHouseConnection chConn = BaseDbWriter.createConnection(jdbcUrl, "Client_1", + clickHouseContainer.getUsername(), clickHouseContainer.getPassword(), new ClickHouseSinkConnectorConfig(new HashMap<>())); + + BaseDbWriter writer = new BaseDbWriter(clickHouseContainer.getHost(), clickHouseContainer.getFirstMappedPort(), + "public", clickHouseContainer.getUsername(), clickHouseContainer.getPassword(), null, chConn); + Map tmColumns = writer.getColumnsDataTypesForTable("tm"); + Assert.assertTrue(tmColumns.size() == 22); + Assert.assertTrue(tmColumns.get("id").equalsIgnoreCase("UUID")); + Assert.assertTrue(tmColumns.get("secid").equalsIgnoreCase("Nullable(UUID)")); + //Assert.assertTrue(tmColumns.get("am").equalsIgnoreCase("Nullable(Decimal(21,5))")); + Assert.assertTrue(tmColumns.get("created").equalsIgnoreCase("Nullable(DateTime64(6))")); + + + int tmCount = 0; + ResultSet chRs = writer.getConnection().prepareStatement("select count(*) from tm").executeQuery(); + while(chRs.next()) { + tmCount = chRs.getInt(1); + } + + Assert.assertTrue(tmCount == 2); + + if(engine.get() != null) { + engine.get().stop(); + } + // Files.deleteIfExists(tmpFilePath); + executorService.shutdown(); + + } +}