From 429d8d22f43c31fa997da307f73cd686c86b0bb6 Mon Sep 17 00:00:00 2001 From: ismail simsek <6005685+ismailsimsek@users.noreply.github.com> Date: Mon, 23 Sep 2024 16:47:55 +0200 Subject: [PATCH 1/3] code fixes --- .../history/BigquerySchemaHistory.java | 47 +++++++------- .../offset/BigqueryOffsetBackingStore.java | 63 +++++++++++-------- 2 files changed, 60 insertions(+), 50 deletions(-) diff --git a/debezium-server-bigquery-sinks/src/main/java/io/debezium/server/bigquery/history/BigquerySchemaHistory.java b/debezium-server-bigquery-sinks/src/main/java/io/debezium/server/bigquery/history/BigquerySchemaHistory.java index 60d4867..c4ff575 100644 --- a/debezium-server-bigquery-sinks/src/main/java/io/debezium/server/bigquery/history/BigquerySchemaHistory.java +++ b/debezium-server-bigquery-sinks/src/main/java/io/debezium/server/bigquery/history/BigquerySchemaHistory.java @@ -8,6 +8,8 @@ package io.debezium.server.bigquery.history; +import autovalue.shaded.com.google.common.collect.ImmutableList; +import com.google.cloud.bigquery.*; import io.debezium.DebeziumException; import io.debezium.annotation.ThreadSafe; import io.debezium.common.annotation.Incubating; @@ -18,6 +20,9 @@ import io.debezium.server.bigquery.BatchUtil; import io.debezium.util.FunctionalReadWriteLock; import io.debezium.util.Strings; +import org.eclipse.microprofile.config.ConfigProvider; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.BufferedReader; import java.io.File; @@ -25,17 +30,14 @@ import java.nio.file.Files; import java.sql.SQLException; import java.sql.Timestamp; +import java.util.Map; import java.util.Optional; +import java.util.Properties; import java.util.UUID; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Consumer; -import autovalue.shaded.com.google.common.collect.ImmutableList; -import com.google.cloud.bigquery.*; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - /** * A {@link SchemaHistory} implementation that stores the schema history to database table * @@ -241,45 +243,42 @@ private void loadFileSchemaHistory(File file) { } public static class BigquerySchemaHistoryConfig { - private final Configuration config; + Properties configCombined = new Properties(); public BigquerySchemaHistoryConfig(Configuration config) { - this.config = config; - } - - private String getConfig(String configName, String fallbackConfigName, String defaultValue) { - return this.config.getString(configName, this.config.getString(fallbackConfigName, defaultValue)); + Configuration confIcebergSubset1 = config.subset(CONFIGURATION_FIELD_PREFIX_STRING + "bigquerybatch.", true); + confIcebergSubset1.forEach(configCombined::put); + Configuration confIcebergSubset2 = config.subset(CONFIGURATION_FIELD_PREFIX_STRING + "bigquerystream.", true); + confIcebergSubset2.forEach(configCombined::putIfAbsent); + // debezium is doing config filtering before passing it down to this class! so we are taking unfiltered configs! + Map confIcebergSubset3 = BatchUtil.getConfigSubset(ConfigProvider.getConfig(), "debezium.sink.bigquerybatch."); + confIcebergSubset3.forEach(configCombined::putIfAbsent); + Map confIcebergSubset4 = BatchUtil.getConfigSubset(ConfigProvider.getConfig(), "debezium.sink.bigquerystream."); + confIcebergSubset4.forEach(configCombined::putIfAbsent); } public String getBigqueryProject() { - return getConfig(CONFIGURATION_FIELD_PREFIX_STRING + "bigquerybatch.project", - CONFIGURATION_FIELD_PREFIX_STRING + "bigquerystream.project", null); + return (String) configCombined.getOrDefault("project", null); } public String getBigqueryDataset() { - return getConfig(CONFIGURATION_FIELD_PREFIX_STRING + "bigquerybatch.dataset", - CONFIGURATION_FIELD_PREFIX_STRING + "bigquerystream.dataset", null); + return (String) configCombined.getOrDefault("dataset", null); } public String getBigqueryTable() { - return getConfig(CONFIGURATION_FIELD_PREFIX_STRING + "bigquery.table-name", - CONFIGURATION_FIELD_PREFIX_STRING + "bigquerystream.table-name", "debezium_database_history_storage" - ); + return (String) configCombined.getOrDefault("bigquery.table-name", "debezium_database_history_storage"); } public String getMigrateHistoryFile() { - return this.getConfig(CONFIGURATION_FIELD_PREFIX_STRING + "bigquery.migrate-history-file", - CONFIGURATION_FIELD_PREFIX_STRING + "bigquerystream.migrate-history-file", ""); + return (String) configCombined.getOrDefault("bigquery.migrate-history-file", ""); } public String getBigqueryCredentialsFile() { - return this.getConfig(CONFIGURATION_FIELD_PREFIX_STRING + "bigquerybatch.credentials-file", - CONFIGURATION_FIELD_PREFIX_STRING + "bigquerystream.credentials-file", ""); + return (String) configCombined.getOrDefault("credentials-file", ""); } public String getBigqueryLocation() { - return this.getConfig(CONFIGURATION_FIELD_PREFIX_STRING + "bigquerybatch.location", - CONFIGURATION_FIELD_PREFIX_STRING + "bigquerystream.location", "US"); + return (String) configCombined.getOrDefault("location", "US"); } } diff --git a/debezium-server-bigquery-sinks/src/main/java/io/debezium/server/bigquery/offset/BigqueryOffsetBackingStore.java b/debezium-server-bigquery-sinks/src/main/java/io/debezium/server/bigquery/offset/BigqueryOffsetBackingStore.java index 40f1e41..b471b8e 100644 --- a/debezium-server-bigquery-sinks/src/main/java/io/debezium/server/bigquery/offset/BigqueryOffsetBackingStore.java +++ b/debezium-server-bigquery-sinks/src/main/java/io/debezium/server/bigquery/offset/BigqueryOffsetBackingStore.java @@ -8,11 +8,26 @@ package io.debezium.server.bigquery.offset; +import autovalue.shaded.com.google.common.collect.ImmutableList; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.cloud.bigquery.*; import io.debezium.DebeziumException; import io.debezium.config.Configuration; import io.debezium.config.Field; import io.debezium.server.bigquery.BatchUtil; import io.debezium.util.Strings; +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.connect.errors.ConnectException; +import org.apache.kafka.connect.runtime.WorkerConfig; +import org.apache.kafka.connect.storage.MemoryOffsetBackingStore; +import org.apache.kafka.connect.storage.OffsetBackingStore; +import org.apache.kafka.connect.util.Callback; +import org.apache.kafka.connect.util.SafeObjectInputStream; +import org.eclipse.microprofile.config.ConfigProvider; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.File; import java.io.IOException; @@ -24,21 +39,6 @@ import java.util.*; import java.util.concurrent.Future; -import autovalue.shaded.com.google.common.collect.ImmutableList; -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.core.type.TypeReference; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.cloud.bigquery.*; -import org.apache.kafka.common.config.ConfigDef; -import org.apache.kafka.connect.errors.ConnectException; -import org.apache.kafka.connect.runtime.WorkerConfig; -import org.apache.kafka.connect.storage.MemoryOffsetBackingStore; -import org.apache.kafka.connect.storage.OffsetBackingStore; -import org.apache.kafka.connect.util.Callback; -import org.apache.kafka.connect.util.SafeObjectInputStream; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - /** * Implementation of OffsetBackingStore that saves data to database table. */ @@ -112,8 +112,8 @@ private void initializeTable() throws SQLException { LOG.debug("Creating table {} to store offset", tableFullName); BatchUtil.executeQuery(bqClient, String.format(OFFSET_STORAGE_TABLE_DDL, tableFullName)); LOG.warn("Created offset storage table {} to store offset", tableFullName); - - if (!Strings.isNullOrEmpty(offsetConfig.getMigrateOffsetFile().strip())){ + + if (!Strings.isNullOrEmpty(offsetConfig.getMigrateOffsetFile().strip())) { LOG.warn("Loading offset from file {}", offsetConfig.getMigrateOffsetFile()); this.loadFileOffset(new File(offsetConfig.getMigrateOffsetFile())); } @@ -166,10 +166,10 @@ private void load() { private void loadFileOffset(File file) { try (SafeObjectInputStream is = new SafeObjectInputStream(Files.newInputStream(file.toPath()))) { Object obj = is.readObject(); - + if (!(obj instanceof HashMap)) throw new ConnectException("Expected HashMap but found " + obj.getClass()); - + Map raw = (Map) obj; for (Map.Entry mapEntry : raw.entrySet()) { ByteBuffer key = (mapEntry.getKey() != null) ? ByteBuffer.wrap(mapEntry.getKey()) : null; @@ -179,10 +179,11 @@ private void loadFileOffset(File file) { } catch (IOException | ClassNotFoundException e) { throw new DebeziumException("Failed migrating offset from file", e); } - + LOG.warn("Loaded file offset, saving it to Bigquery offset storage"); save(); } + @Override public Future set(final Map values, final Callback callback) { @@ -226,6 +227,7 @@ public Set> connectorPartitions(String connectorName) { public static class BigqueryOffsetBackingStoreConfig extends WorkerConfig { private final Configuration config; + Properties configCombined = new Properties(); static final Field SINK_TYPE_FIELD = Field.create("debezium.sink.type").optional(); static final Field SINK_TYPE_FIELD_FALLBACK = Field.create("name").optional(); @@ -233,6 +235,15 @@ public static class BigqueryOffsetBackingStoreConfig extends WorkerConfig { public BigqueryOffsetBackingStoreConfig(Configuration config) { super(new ConfigDef(), config.asMap()); this.config = config; + Configuration confIcebergSubset1 = config.subset(CONFIGURATION_FIELD_PREFIX_STRING + "bigquerybatch.", true); + confIcebergSubset1.forEach(configCombined::put); + Configuration confIcebergSubset2 = config.subset(CONFIGURATION_FIELD_PREFIX_STRING + "bigquerystream.", true); + confIcebergSubset2.forEach(configCombined::putIfAbsent); + // debezium is doing config filtering before passing it down to this class! so we are taking unfiltered configs! + Map confIcebergSubset3 = BatchUtil.getConfigSubset(ConfigProvider.getConfig(), "debezium.sink.bigquerybatch."); + confIcebergSubset3.forEach(configCombined::putIfAbsent); + Map confIcebergSubset4 = BatchUtil.getConfigSubset(ConfigProvider.getConfig(), "debezium.sink.bigquerystream."); + confIcebergSubset4.forEach(configCombined::putIfAbsent); } public String sinkType() { @@ -245,27 +256,27 @@ public String sinkType() { } public String getBigqueryProject() { - return this.config.getString(Field.create(String.format(CONFIGURATION_FIELD_PREFIX_STRING + "%s.project", this.sinkType()))); + return (String) this.configCombined.getOrDefault("project", null); } public String getBigqueryDataset() { - return this.config.getString(Field.create(String.format(CONFIGURATION_FIELD_PREFIX_STRING + "%s.dataset", this.sinkType()))); + return (String) this.configCombined.getOrDefault("dataset", null); } public String getBigqueryTable() { - return this.config.getString(Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "bigquery.table-name").withDefault("debezium_offset_storage")); + return (String) this.configCombined.getOrDefault("bigquery.table-name", "debezium_offset_storage"); } public String getMigrateOffsetFile() { - return this.config.getString(Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "bigquery.migrate-offset-file").withDefault("")); + return (String) this.configCombined.getOrDefault("bigquery.migrate-offset-file", ""); } public String getBigqueryCredentialsFile() { - return this.config.getString(Field.create(String.format(CONFIGURATION_FIELD_PREFIX_STRING + "%s.credentialsFile", this.sinkType())).withDefault("")); + return (String) this.configCombined.getOrDefault("credentialsFile", ""); } public String getBigqueryLocation() { - return this.config.getString(Field.create(String.format(CONFIGURATION_FIELD_PREFIX_STRING + "%s.location", this.sinkType())).withDefault("US")); + return (String) this.configCombined.getOrDefault("location", "US"); } } From d8a0e42e87fee2b932616142091a4bdbda9f942a Mon Sep 17 00:00:00 2001 From: ismail simsek <6005685+ismailsimsek@users.noreply.github.com> Date: Tue, 24 Sep 2024 13:44:08 +0200 Subject: [PATCH 2/3] Upgrade jandex-maven-plugin version --- .../debezium/server/bigquery/BatchUtil.java | 31 ++++++++++++----- .../history/BigquerySchemaHistory.java | 12 +++---- .../offset/BigqueryOffsetBackingStore.java | 34 ++++++------------- 3 files changed, 37 insertions(+), 40 deletions(-) diff --git a/debezium-server-bigquery-sinks/src/main/java/io/debezium/server/bigquery/BatchUtil.java b/debezium-server-bigquery-sinks/src/main/java/io/debezium/server/bigquery/BatchUtil.java index 4092b96..135034e 100644 --- a/debezium-server-bigquery-sinks/src/main/java/io/debezium/server/bigquery/BatchUtil.java +++ b/debezium-server-bigquery-sinks/src/main/java/io/debezium/server/bigquery/BatchUtil.java @@ -8,7 +8,17 @@ package io.debezium.server.bigquery; +import com.google.api.gax.retrying.RetrySettings; +import com.google.auth.oauth2.GoogleCredentials; +import com.google.cloud.bigquery.*; import io.debezium.DebeziumException; +import io.debezium.config.Configuration; +import io.debezium.config.Field; +import jakarta.enterprise.inject.Instance; +import jakarta.enterprise.inject.literal.NamedLiteral; +import org.eclipse.microprofile.config.Config; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.FileInputStream; import java.io.IOException; @@ -18,15 +28,6 @@ import java.util.Map; import java.util.Optional; -import com.google.api.gax.retrying.RetrySettings; -import com.google.auth.oauth2.GoogleCredentials; -import com.google.cloud.bigquery.*; -import jakarta.enterprise.inject.Instance; -import jakarta.enterprise.inject.literal.NamedLiteral; -import org.eclipse.microprofile.config.Config; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - /** * * @author Ismail Simsek @@ -34,6 +35,9 @@ public class BatchUtil { protected static final Logger LOGGER = LoggerFactory.getLogger(BatchUtil.class); + static final io.debezium.config.Field SINK_TYPE_FIELD = io.debezium.config.Field.create("debezium.sink.type").optional(); + static final io.debezium.config.Field SINK_TYPE_FIELD_FALLBACK = Field.create("name").optional(); + public static Map getConfigSubset(Config config, String prefix) { final Map ret = new HashMap<>(); @@ -47,6 +51,15 @@ public static Map getConfigSubset(Config config, String prefix) return ret; } + + public static String sinkType(Configuration config) { + String type = config.getString(SINK_TYPE_FIELD, config.getString(SINK_TYPE_FIELD_FALLBACK)); + if (type == null) { + throw new DebeziumException("The config property debezium.sink.type is required " + "but it could not be found in any config source"); + } + return type; + } + public static T selectInstance(Instance instances, String name) { Instance instance = instances.select(NamedLiteral.of(name)); diff --git a/debezium-server-bigquery-sinks/src/main/java/io/debezium/server/bigquery/history/BigquerySchemaHistory.java b/debezium-server-bigquery-sinks/src/main/java/io/debezium/server/bigquery/history/BigquerySchemaHistory.java index c4ff575..6ef0584 100644 --- a/debezium-server-bigquery-sinks/src/main/java/io/debezium/server/bigquery/history/BigquerySchemaHistory.java +++ b/debezium-server-bigquery-sinks/src/main/java/io/debezium/server/bigquery/history/BigquerySchemaHistory.java @@ -246,17 +246,15 @@ public static class BigquerySchemaHistoryConfig { Properties configCombined = new Properties(); public BigquerySchemaHistoryConfig(Configuration config) { - Configuration confIcebergSubset1 = config.subset(CONFIGURATION_FIELD_PREFIX_STRING + "bigquerybatch.", true); + String sinkType = BatchUtil.sinkType(config); + Configuration confIcebergSubset1 = config.subset(CONFIGURATION_FIELD_PREFIX_STRING + sinkType + ".", true); confIcebergSubset1.forEach(configCombined::put); - Configuration confIcebergSubset2 = config.subset(CONFIGURATION_FIELD_PREFIX_STRING + "bigquerystream.", true); - confIcebergSubset2.forEach(configCombined::putIfAbsent); // debezium is doing config filtering before passing it down to this class! so we are taking unfiltered configs! - Map confIcebergSubset3 = BatchUtil.getConfigSubset(ConfigProvider.getConfig(), "debezium.sink.bigquerybatch."); - confIcebergSubset3.forEach(configCombined::putIfAbsent); - Map confIcebergSubset4 = BatchUtil.getConfigSubset(ConfigProvider.getConfig(), "debezium.sink.bigquerystream."); - confIcebergSubset4.forEach(configCombined::putIfAbsent); + Map confIcebergSubset2 = BatchUtil.getConfigSubset(ConfigProvider.getConfig(), "debezium.sink." + sinkType + "."); + confIcebergSubset2.forEach(configCombined::putIfAbsent); } + public String getBigqueryProject() { return (String) configCombined.getOrDefault("project", null); } diff --git a/debezium-server-bigquery-sinks/src/main/java/io/debezium/server/bigquery/offset/BigqueryOffsetBackingStore.java b/debezium-server-bigquery-sinks/src/main/java/io/debezium/server/bigquery/offset/BigqueryOffsetBackingStore.java index b471b8e..61572de 100644 --- a/debezium-server-bigquery-sinks/src/main/java/io/debezium/server/bigquery/offset/BigqueryOffsetBackingStore.java +++ b/debezium-server-bigquery-sinks/src/main/java/io/debezium/server/bigquery/offset/BigqueryOffsetBackingStore.java @@ -226,7 +226,6 @@ public Set> connectorPartitions(String connectorName) { } public static class BigqueryOffsetBackingStoreConfig extends WorkerConfig { - private final Configuration config; Properties configCombined = new Properties(); static final Field SINK_TYPE_FIELD = Field.create("debezium.sink.type").optional(); @@ -234,49 +233,36 @@ public static class BigqueryOffsetBackingStoreConfig extends WorkerConfig { public BigqueryOffsetBackingStoreConfig(Configuration config) { super(new ConfigDef(), config.asMap()); - this.config = config; - Configuration confIcebergSubset1 = config.subset(CONFIGURATION_FIELD_PREFIX_STRING + "bigquerybatch.", true); + String sinkType = BatchUtil.sinkType(config); + Configuration confIcebergSubset1 = config.subset(CONFIGURATION_FIELD_PREFIX_STRING + sinkType + ".", true); confIcebergSubset1.forEach(configCombined::put); - Configuration confIcebergSubset2 = config.subset(CONFIGURATION_FIELD_PREFIX_STRING + "bigquerystream.", true); - confIcebergSubset2.forEach(configCombined::putIfAbsent); // debezium is doing config filtering before passing it down to this class! so we are taking unfiltered configs! - Map confIcebergSubset3 = BatchUtil.getConfigSubset(ConfigProvider.getConfig(), "debezium.sink.bigquerybatch."); - confIcebergSubset3.forEach(configCombined::putIfAbsent); - Map confIcebergSubset4 = BatchUtil.getConfigSubset(ConfigProvider.getConfig(), "debezium.sink.bigquerystream."); - confIcebergSubset4.forEach(configCombined::putIfAbsent); - } - - public String sinkType() { - String type = this.config.getString(SINK_TYPE_FIELD, this.config.getString(SINK_TYPE_FIELD_FALLBACK)); - if (type == null) { - throw new DebeziumException("The config property debezium.sink.type is required " + - "but it could not be found in any config source"); - } - return type; + Map confIcebergSubset2 = BatchUtil.getConfigSubset(ConfigProvider.getConfig(), "debezium.sink." + sinkType + "."); + confIcebergSubset2.forEach(configCombined::putIfAbsent); } public String getBigqueryProject() { - return (String) this.configCombined.getOrDefault("project", null); + return (String) configCombined.getOrDefault("project", null); } public String getBigqueryDataset() { - return (String) this.configCombined.getOrDefault("dataset", null); + return (String) configCombined.getOrDefault("dataset", null); } public String getBigqueryTable() { - return (String) this.configCombined.getOrDefault("bigquery.table-name", "debezium_offset_storage"); + return (String) configCombined.getOrDefault("bigquery.table-name", "debezium_offset_storage"); } public String getMigrateOffsetFile() { - return (String) this.configCombined.getOrDefault("bigquery.migrate-offset-file", ""); + return (String) configCombined.getOrDefault("bigquery.migrate-offset-file", ""); } public String getBigqueryCredentialsFile() { - return (String) this.configCombined.getOrDefault("credentialsFile", ""); + return (String) configCombined.getOrDefault("credentials-file", ""); } public String getBigqueryLocation() { - return (String) this.configCombined.getOrDefault("location", "US"); + return (String) configCombined.getOrDefault("location", "US"); } } From 49bbe5d300a1238dcd61618e559f033cf7758607 Mon Sep 17 00:00:00 2001 From: ismail simsek <6005685+ismailsimsek@users.noreply.github.com> Date: Tue, 24 Sep 2024 13:47:31 +0200 Subject: [PATCH 3/3] Upgrade jandex-maven-plugin version --- .../server/bigquery/AbstractChangeConsumer.java | 2 +- .../bigquery/BatchBigqueryChangeConsumer.java | 2 +- .../{BatchUtil.java => ConsumerUtil.java} | 8 ++++---- .../bigquery/StreamBigqueryChangeConsumer.java | 2 +- .../bigquery/history/BigquerySchemaHistory.java | 16 ++++++++-------- .../offset/BigqueryOffsetBackingStore.java | 16 ++++++++-------- ...{BatchUtilTest.java => ConsumerUtilTest.java} | 10 +++++----- 7 files changed, 28 insertions(+), 28 deletions(-) rename debezium-server-bigquery-sinks/src/main/java/io/debezium/server/bigquery/{BatchUtil.java => ConsumerUtil.java} (94%) rename debezium-server-bigquery-sinks/src/test/java/io/debezium/server/bigquery/{BatchUtilTest.java => ConsumerUtilTest.java} (96%) diff --git a/debezium-server-bigquery-sinks/src/main/java/io/debezium/server/bigquery/AbstractChangeConsumer.java b/debezium-server-bigquery-sinks/src/main/java/io/debezium/server/bigquery/AbstractChangeConsumer.java index 6e50628..fece430 100644 --- a/debezium-server-bigquery-sinks/src/main/java/io/debezium/server/bigquery/AbstractChangeConsumer.java +++ b/debezium-server-bigquery-sinks/src/main/java/io/debezium/server/bigquery/AbstractChangeConsumer.java @@ -84,7 +84,7 @@ public void initizalize() throws InterruptedException { throw new InterruptedException("debezium.format.key={" + valueFormat + "} not supported! Supported (debezium.format.key=*) formats are {json,}!"); } - batchSizeWait = BatchUtil.selectInstance(batchSizeWaitInstances, batchSizeWaitName); + batchSizeWait = ConsumerUtil.selectInstance(batchSizeWaitInstances, batchSizeWaitName); LOGGER.info("Using {} to optimize batch size", batchSizeWait.getClass().getSimpleName()); batchSizeWait.initizalize(); } diff --git a/debezium-server-bigquery-sinks/src/main/java/io/debezium/server/bigquery/BatchBigqueryChangeConsumer.java b/debezium-server-bigquery-sinks/src/main/java/io/debezium/server/bigquery/BatchBigqueryChangeConsumer.java index 42f8c9d..3e51736 100644 --- a/debezium-server-bigquery-sinks/src/main/java/io/debezium/server/bigquery/BatchBigqueryChangeConsumer.java +++ b/debezium-server-bigquery-sinks/src/main/java/io/debezium/server/bigquery/BatchBigqueryChangeConsumer.java @@ -77,7 +77,7 @@ void connect() throws InterruptedException { public void initizalize() throws InterruptedException { super.initizalize(); - bqClient = BatchUtil.getBQClient(gcpProject, bqDataset, credentialsFile , bqLocation); + bqClient = ConsumerUtil.bigqueryClient(gcpProject, bqDataset, credentialsFile, bqLocation); timePartitioning = TimePartitioning.newBuilder(TimePartitioning.Type.valueOf(partitionType)).setField(partitionField).build(); diff --git a/debezium-server-bigquery-sinks/src/main/java/io/debezium/server/bigquery/BatchUtil.java b/debezium-server-bigquery-sinks/src/main/java/io/debezium/server/bigquery/ConsumerUtil.java similarity index 94% rename from debezium-server-bigquery-sinks/src/main/java/io/debezium/server/bigquery/BatchUtil.java rename to debezium-server-bigquery-sinks/src/main/java/io/debezium/server/bigquery/ConsumerUtil.java index 135034e..8d08fd2 100644 --- a/debezium-server-bigquery-sinks/src/main/java/io/debezium/server/bigquery/BatchUtil.java +++ b/debezium-server-bigquery-sinks/src/main/java/io/debezium/server/bigquery/ConsumerUtil.java @@ -32,8 +32,8 @@ * * @author Ismail Simsek */ -public class BatchUtil { - protected static final Logger LOGGER = LoggerFactory.getLogger(BatchUtil.class); +public class ConsumerUtil { + protected static final Logger LOGGER = LoggerFactory.getLogger(ConsumerUtil.class); static final io.debezium.config.Field SINK_TYPE_FIELD = io.debezium.config.Field.create("debezium.sink.type").optional(); static final io.debezium.config.Field SINK_TYPE_FIELD_FALLBACK = Field.create("name").optional(); @@ -73,7 +73,7 @@ public static T selectInstance(Instance instances, String name) { return instance.get(); } - public static BigQuery getBQClient(Optional gcpProject, Optional bqDataset, Optional credentialsFile, String bqLocation) throws InterruptedException { + public static BigQuery bigqueryClient(Optional gcpProject, Optional bqDataset, Optional credentialsFile, String bqLocation) throws InterruptedException { if (gcpProject.isEmpty()) { throw new InterruptedException("Please provide a value for `debezium.sink.{bigquerybatch|bigquerystream}.project`"); @@ -129,7 +129,7 @@ public static TableResult executeQuery(BigQuery bqClient, String query, List records) { lock.write(() -> { try { if (exists()) { - TableResult rs = BatchUtil.executeQuery(bqClient, String.format(DATABASE_HISTORY_STORAGE_TABLE_SELECT, tableFullName)); + TableResult rs = ConsumerUtil.executeQuery(bqClient, String.format(DATABASE_HISTORY_STORAGE_TABLE_SELECT, tableFullName)); for (FieldValueList row : rs.getValues()) { String line = row.get("history_data").getStringValue(); if (line == null) { @@ -182,7 +182,7 @@ public boolean exists() { int numRows = 0; try { - TableResult rs = BatchUtil.executeQuery(bqClient, "SELECT COUNT(*) as row_count FROM " + tableFullName); + TableResult rs = ConsumerUtil.executeQuery(bqClient, "SELECT COUNT(*) as row_count FROM " + tableFullName); for (FieldValueList row : rs.getValues()) { numRows = row.get("row_count").getNumericValue().intValue(); break; @@ -203,7 +203,7 @@ public void initializeStorage() { if (!storageExists()) { try { LOG.debug("Creating table {} to store database history", tableFullName); - BatchUtil.executeQuery(bqClient, String.format(DATABASE_HISTORY_TABLE_DDL, tableFullName)); + ConsumerUtil.executeQuery(bqClient, String.format(DATABASE_HISTORY_TABLE_DDL, tableFullName)); LOG.warn("Created database history storage table {} to store history", tableFullName); if (!Strings.isNullOrEmpty(historyConfig.getMigrateHistoryFile().strip())) { @@ -246,11 +246,11 @@ public static class BigquerySchemaHistoryConfig { Properties configCombined = new Properties(); public BigquerySchemaHistoryConfig(Configuration config) { - String sinkType = BatchUtil.sinkType(config); + String sinkType = ConsumerUtil.sinkType(config); Configuration confIcebergSubset1 = config.subset(CONFIGURATION_FIELD_PREFIX_STRING + sinkType + ".", true); confIcebergSubset1.forEach(configCombined::put); // debezium is doing config filtering before passing it down to this class! so we are taking unfiltered configs! - Map confIcebergSubset2 = BatchUtil.getConfigSubset(ConfigProvider.getConfig(), "debezium.sink." + sinkType + "."); + Map confIcebergSubset2 = ConsumerUtil.getConfigSubset(ConfigProvider.getConfig(), "debezium.sink." + sinkType + "."); confIcebergSubset2.forEach(configCombined::putIfAbsent); } diff --git a/debezium-server-bigquery-sinks/src/main/java/io/debezium/server/bigquery/offset/BigqueryOffsetBackingStore.java b/debezium-server-bigquery-sinks/src/main/java/io/debezium/server/bigquery/offset/BigqueryOffsetBackingStore.java index 61572de..d9f96c9 100644 --- a/debezium-server-bigquery-sinks/src/main/java/io/debezium/server/bigquery/offset/BigqueryOffsetBackingStore.java +++ b/debezium-server-bigquery-sinks/src/main/java/io/debezium/server/bigquery/offset/BigqueryOffsetBackingStore.java @@ -16,7 +16,7 @@ import io.debezium.DebeziumException; import io.debezium.config.Configuration; import io.debezium.config.Field; -import io.debezium.server.bigquery.BatchUtil; +import io.debezium.server.bigquery.ConsumerUtil; import io.debezium.util.Strings; import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.connect.errors.ConnectException; @@ -79,7 +79,7 @@ public void configure(WorkerConfig config) { this.offsetConfig = new BigqueryOffsetBackingStoreConfig(Configuration.from(config.originalsStrings())); try { - bqClient = BatchUtil.getBQClient( + bqClient = ConsumerUtil.bigqueryClient( Optional.ofNullable(this.offsetConfig.getBigqueryProject()), Optional.ofNullable(this.offsetConfig.getBigqueryDataset()), Optional.ofNullable(this.offsetConfig.getBigqueryCredentialsFile()), @@ -110,7 +110,7 @@ private void initializeTable() throws SQLException { Table table = bqClient.getTable(tableId); if (table == null) { LOG.debug("Creating table {} to store offset", tableFullName); - BatchUtil.executeQuery(bqClient, String.format(OFFSET_STORAGE_TABLE_DDL, tableFullName)); + ConsumerUtil.executeQuery(bqClient, String.format(OFFSET_STORAGE_TABLE_DDL, tableFullName)); LOG.warn("Created offset storage table {} to store offset", tableFullName); if (!Strings.isNullOrEmpty(offsetConfig.getMigrateOffsetFile().strip())) { @@ -123,11 +123,11 @@ private void initializeTable() throws SQLException { protected void save() { LOG.debug("Saving offset data to bigquery table..."); try { - BatchUtil.executeQuery(bqClient, String.format(OFFSET_STORAGE_TABLE_DELETE, tableFullName)); + ConsumerUtil.executeQuery(bqClient, String.format(OFFSET_STORAGE_TABLE_DELETE, tableFullName)); String dataJson = mapper.writeValueAsString(data); LOG.debug("Saving offset data {}", dataJson); Timestamp currentTs = new Timestamp(System.currentTimeMillis()); - BatchUtil.executeQuery(bqClient, + ConsumerUtil.executeQuery(bqClient, String.format(OFFSET_STORAGE_TABLE_INSERT, tableFullName), ImmutableList.of( QueryParameterValue.string(UUID.randomUUID().toString()), @@ -145,7 +145,7 @@ protected void save() { private void load() { try { String dataJsonString = null; - TableResult rs = BatchUtil.executeQuery(bqClient, String.format(OFFSET_STORAGE_TABLE_SELECT, tableFullName)); + TableResult rs = ConsumerUtil.executeQuery(bqClient, String.format(OFFSET_STORAGE_TABLE_SELECT, tableFullName)); for (FieldValueList row : rs.getValues()) { dataJsonString = row.get("offset_data").getStringValue(); break; @@ -233,11 +233,11 @@ public static class BigqueryOffsetBackingStoreConfig extends WorkerConfig { public BigqueryOffsetBackingStoreConfig(Configuration config) { super(new ConfigDef(), config.asMap()); - String sinkType = BatchUtil.sinkType(config); + String sinkType = ConsumerUtil.sinkType(config); Configuration confIcebergSubset1 = config.subset(CONFIGURATION_FIELD_PREFIX_STRING + sinkType + ".", true); confIcebergSubset1.forEach(configCombined::put); // debezium is doing config filtering before passing it down to this class! so we are taking unfiltered configs! - Map confIcebergSubset2 = BatchUtil.getConfigSubset(ConfigProvider.getConfig(), "debezium.sink." + sinkType + "."); + Map confIcebergSubset2 = ConsumerUtil.getConfigSubset(ConfigProvider.getConfig(), "debezium.sink." + sinkType + "."); confIcebergSubset2.forEach(configCombined::putIfAbsent); } diff --git a/debezium-server-bigquery-sinks/src/test/java/io/debezium/server/bigquery/BatchUtilTest.java b/debezium-server-bigquery-sinks/src/test/java/io/debezium/server/bigquery/ConsumerUtilTest.java similarity index 96% rename from debezium-server-bigquery-sinks/src/test/java/io/debezium/server/bigquery/BatchUtilTest.java rename to debezium-server-bigquery-sinks/src/test/java/io/debezium/server/bigquery/ConsumerUtilTest.java index 6850b9e..65fba4d 100644 --- a/debezium-server-bigquery-sinks/src/test/java/io/debezium/server/bigquery/BatchUtilTest.java +++ b/debezium-server-bigquery-sinks/src/test/java/io/debezium/server/bigquery/ConsumerUtilTest.java @@ -8,24 +8,24 @@ package io.debezium.server.bigquery; +import com.fasterxml.jackson.databind.JsonNode; import io.debezium.serde.DebeziumSerdes; +import org.apache.kafka.common.serialization.Serde; +import org.junit.jupiter.api.Test; import java.io.IOException; import java.nio.file.Files; import java.nio.file.Path; import java.util.Collections; -import com.fasterxml.jackson.databind.JsonNode; -import org.apache.kafka.common.serialization.Serde; -import org.junit.jupiter.api.Test; import static org.junit.jupiter.api.Assertions.*; -class BatchUtilTest { +class ConsumerUtilTest { final String serdeWithSchema = Files.readString(Path.of("src/test/resources/json/serde-with-schema.json")); final String unwrapWithSchema = Files.readString(Path.of("src/test/resources/json/unwrap-with-schema.json")); - BatchUtilTest() throws IOException { + ConsumerUtilTest() throws IOException { } @Test