From e8cef17a6fafe25d12ee338ce0fe0c86827e9c0d Mon Sep 17 00:00:00 2001 From: IlyaTsoi <45252974+IlyaTsoi@users.noreply.github.com> Date: Tue, 24 Oct 2023 23:49:55 +0300 Subject: [PATCH 1/3] Get last element as a table name from a topic name It would be better if it didn't depend on the hard-coded .. template --- .../clickhouse/sink/connector/common/Utils.java | 10 +++------- 1 file changed, 3 insertions(+), 7 deletions(-) diff --git a/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/common/Utils.java b/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/common/Utils.java index 079961470..5418b76b5 100644 --- a/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/common/Utils.java +++ b/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/common/Utils.java @@ -88,13 +88,9 @@ public static Map parseTopicToTableMap(String input) throws Exce public static String getTableNameFromTopic(String topicName) { String tableName = null; - - // topic names is of the following format. - // hostname.dbName.tableName + // get last element after deviding topicName by "." as tableName String[] splitName = topicName.split("\\."); - if(splitName.length == 3) { - tableName = splitName[2]; - } + tableName = splitName[splitName.length - 1]; return tableName; } @@ -108,4 +104,4 @@ public static boolean isValidTable(String tableName) { return true; } -} \ No newline at end of file +} From 513664484ec90b7b57e9e64995cc2c59994a10e3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=98=D0=BB=D1=8C=D1=8F=20=D0=97=D0=B0=D1=85=D0=B0=D1=80?= =?UTF-8?q?=D0=BE=D0=B2=20=28ilia=2Ezakharov=29?= Date: Wed, 25 Oct 2023 12:48:48 +0300 Subject: [PATCH 2/3] I think, that the current code doesn't work properly. It runs ClickhouseBatchRunnable sequently, instead of parallel. We should create a few ClickhouseBatchRunnable tasks by scheduleAtFixedRate method to make them work in parallel. Such behavior for single task is described in docs for ScheduledThreadPoolExecutor for method scheduleAtFixedRate. --- .../clickhouse/sink/connector/ClickHouseSinkTask.java | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/ClickHouseSinkTask.java b/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/ClickHouseSinkTask.java index 2ca580a54..85adc7bcc 100644 --- a/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/ClickHouseSinkTask.java +++ b/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/ClickHouseSinkTask.java @@ -67,10 +67,13 @@ public void start(Map config) { this.id = "task-" + this.config.getLong(ClickHouseSinkConnectorConfigVariables.TASK_ID.toString()); this.records = new ConcurrentHashMap<>(); - ClickHouseBatchRunnable runnable = new ClickHouseBatchRunnable(this.records, this.config, topic2TableMap); - this.executor = new ClickHouseBatchExecutor(this.config.getInt(ClickHouseSinkConnectorConfigVariables.THREAD_POOL_SIZE.toString())); - this.executor.scheduleAtFixedRate(runnable, 0, this.config.getLong(ClickHouseSinkConnectorConfigVariables.BUFFER_FLUSH_TIME.toString()), TimeUnit.MILLISECONDS); - + long runIntervalMs = this.config.getLong(ClickHouseSinkConnectorConfigVariables.BUFFER_FLUSH_TIME.toString()); + int threadPoolSize = this.config.getInt(ClickHouseSinkConnectorConfigVariables.THREAD_POOL_SIZE.toString()); + this.executor = new ClickHouseBatchExecutor(threadPoolSize); + for (int i = 0; i < threadPoolSize; i++) { + ClickHouseBatchRunnable runnable = new ClickHouseBatchRunnable(this.records, this.config, topic2TableMap); + this.executor.scheduleAtFixedRate(runnable, 0, runIntervalMs, TimeUnit.MILLISECONDS); + } this.deduplicator = new DeDuplicator(this.config); } From 78d94316fab7c762d2ea04462aa0de6cfd0fc5bf Mon Sep 17 00:00:00 2001 From: IlyaTsoi Date: Mon, 30 Oct 2023 16:55:35 +0300 Subject: [PATCH 3/3] Revert "Get last element as a table name from a topic name" This reverts commit e8cef17a6fafe25d12ee338ce0fe0c86827e9c0d. --- .../clickhouse/sink/connector/common/Utils.java | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/common/Utils.java b/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/common/Utils.java index 5418b76b5..079961470 100644 --- a/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/common/Utils.java +++ b/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/common/Utils.java @@ -88,9 +88,13 @@ public static Map parseTopicToTableMap(String input) throws Exce public static String getTableNameFromTopic(String topicName) { String tableName = null; - // get last element after deviding topicName by "." as tableName + + // topic names is of the following format. + // hostname.dbName.tableName String[] splitName = topicName.split("\\."); - tableName = splitName[splitName.length - 1]; + if(splitName.length == 3) { + tableName = splitName[2]; + } return tableName; } @@ -104,4 +108,4 @@ public static boolean isValidTable(String tableName) { return true; } -} +} \ No newline at end of file