diff --git a/debezium-server-bigquery-sinks/src/main/java/io/debezium/server/bigquery/AbstractChangeConsumer.java b/debezium-server-bigquery-sinks/src/main/java/io/debezium/server/bigquery/AbstractChangeConsumer.java index 6e50628..fece430 100644 --- a/debezium-server-bigquery-sinks/src/main/java/io/debezium/server/bigquery/AbstractChangeConsumer.java +++ b/debezium-server-bigquery-sinks/src/main/java/io/debezium/server/bigquery/AbstractChangeConsumer.java @@ -84,7 +84,7 @@ public void initizalize() throws InterruptedException { throw new InterruptedException("debezium.format.key={" + valueFormat + "} not supported! Supported (debezium.format.key=*) formats are {json,}!"); } - batchSizeWait = BatchUtil.selectInstance(batchSizeWaitInstances, batchSizeWaitName); + batchSizeWait = ConsumerUtil.selectInstance(batchSizeWaitInstances, batchSizeWaitName); LOGGER.info("Using {} to optimize batch size", batchSizeWait.getClass().getSimpleName()); batchSizeWait.initizalize(); } diff --git a/debezium-server-bigquery-sinks/src/main/java/io/debezium/server/bigquery/BatchBigqueryChangeConsumer.java b/debezium-server-bigquery-sinks/src/main/java/io/debezium/server/bigquery/BatchBigqueryChangeConsumer.java index 42f8c9d..3e51736 100644 --- a/debezium-server-bigquery-sinks/src/main/java/io/debezium/server/bigquery/BatchBigqueryChangeConsumer.java +++ b/debezium-server-bigquery-sinks/src/main/java/io/debezium/server/bigquery/BatchBigqueryChangeConsumer.java @@ -77,7 +77,7 @@ void connect() throws InterruptedException { public void initizalize() throws InterruptedException { super.initizalize(); - bqClient = BatchUtil.getBQClient(gcpProject, bqDataset, credentialsFile , bqLocation); + bqClient = ConsumerUtil.bigqueryClient(gcpProject, bqDataset, credentialsFile, bqLocation); timePartitioning = TimePartitioning.newBuilder(TimePartitioning.Type.valueOf(partitionType)).setField(partitionField).build(); diff --git a/debezium-server-bigquery-sinks/src/main/java/io/debezium/server/bigquery/BatchUtil.java b/debezium-server-bigquery-sinks/src/main/java/io/debezium/server/bigquery/ConsumerUtil.java similarity index 81% rename from debezium-server-bigquery-sinks/src/main/java/io/debezium/server/bigquery/BatchUtil.java rename to debezium-server-bigquery-sinks/src/main/java/io/debezium/server/bigquery/ConsumerUtil.java index 4092b96..8d08fd2 100644 --- a/debezium-server-bigquery-sinks/src/main/java/io/debezium/server/bigquery/BatchUtil.java +++ b/debezium-server-bigquery-sinks/src/main/java/io/debezium/server/bigquery/ConsumerUtil.java @@ -8,7 +8,17 @@ package io.debezium.server.bigquery; +import com.google.api.gax.retrying.RetrySettings; +import com.google.auth.oauth2.GoogleCredentials; +import com.google.cloud.bigquery.*; import io.debezium.DebeziumException; +import io.debezium.config.Configuration; +import io.debezium.config.Field; +import jakarta.enterprise.inject.Instance; +import jakarta.enterprise.inject.literal.NamedLiteral; +import org.eclipse.microprofile.config.Config; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.FileInputStream; import java.io.IOException; @@ -18,21 +28,15 @@ import java.util.Map; import java.util.Optional; -import com.google.api.gax.retrying.RetrySettings; -import com.google.auth.oauth2.GoogleCredentials; -import com.google.cloud.bigquery.*; -import jakarta.enterprise.inject.Instance; -import jakarta.enterprise.inject.literal.NamedLiteral; -import org.eclipse.microprofile.config.Config; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - /** * * @author Ismail Simsek */ -public class BatchUtil { - protected static final Logger LOGGER = LoggerFactory.getLogger(BatchUtil.class); +public class ConsumerUtil { + protected static final Logger LOGGER = LoggerFactory.getLogger(ConsumerUtil.class); + + static final io.debezium.config.Field SINK_TYPE_FIELD = io.debezium.config.Field.create("debezium.sink.type").optional(); + static final io.debezium.config.Field SINK_TYPE_FIELD_FALLBACK = Field.create("name").optional(); public static Map getConfigSubset(Config config, String prefix) { final Map ret = new HashMap<>(); @@ -47,6 +51,15 @@ public static Map getConfigSubset(Config config, String prefix) return ret; } + + public static String sinkType(Configuration config) { + String type = config.getString(SINK_TYPE_FIELD, config.getString(SINK_TYPE_FIELD_FALLBACK)); + if (type == null) { + throw new DebeziumException("The config property debezium.sink.type is required " + "but it could not be found in any config source"); + } + return type; + } + public static T selectInstance(Instance instances, String name) { Instance instance = instances.select(NamedLiteral.of(name)); @@ -60,7 +73,7 @@ public static T selectInstance(Instance instances, String name) { return instance.get(); } - public static BigQuery getBQClient(Optional gcpProject, Optional bqDataset, Optional credentialsFile, String bqLocation) throws InterruptedException { + public static BigQuery bigqueryClient(Optional gcpProject, Optional bqDataset, Optional credentialsFile, String bqLocation) throws InterruptedException { if (gcpProject.isEmpty()) { throw new InterruptedException("Please provide a value for `debezium.sink.{bigquerybatch|bigquerystream}.project`"); @@ -116,7 +129,7 @@ public static TableResult executeQuery(BigQuery bqClient, String query, List records) { lock.write(() -> { try { if (exists()) { - TableResult rs = BatchUtil.executeQuery(bqClient, String.format(DATABASE_HISTORY_STORAGE_TABLE_SELECT, tableFullName)); + TableResult rs = ConsumerUtil.executeQuery(bqClient, String.format(DATABASE_HISTORY_STORAGE_TABLE_SELECT, tableFullName)); for (FieldValueList row : rs.getValues()) { String line = row.get("history_data").getStringValue(); if (line == null) { @@ -180,7 +182,7 @@ public boolean exists() { int numRows = 0; try { - TableResult rs = BatchUtil.executeQuery(bqClient, "SELECT COUNT(*) as row_count FROM " + tableFullName); + TableResult rs = ConsumerUtil.executeQuery(bqClient, "SELECT COUNT(*) as row_count FROM " + tableFullName); for (FieldValueList row : rs.getValues()) { numRows = row.get("row_count").getNumericValue().intValue(); break; @@ -201,7 +203,7 @@ public void initializeStorage() { if (!storageExists()) { try { LOG.debug("Creating table {} to store database history", tableFullName); - BatchUtil.executeQuery(bqClient, String.format(DATABASE_HISTORY_TABLE_DDL, tableFullName)); + ConsumerUtil.executeQuery(bqClient, String.format(DATABASE_HISTORY_TABLE_DDL, tableFullName)); LOG.warn("Created database history storage table {} to store history", tableFullName); if (!Strings.isNullOrEmpty(historyConfig.getMigrateHistoryFile().strip())) { @@ -241,45 +243,40 @@ private void loadFileSchemaHistory(File file) { } public static class BigquerySchemaHistoryConfig { - private final Configuration config; + Properties configCombined = new Properties(); public BigquerySchemaHistoryConfig(Configuration config) { - this.config = config; + String sinkType = ConsumerUtil.sinkType(config); + Configuration confIcebergSubset1 = config.subset(CONFIGURATION_FIELD_PREFIX_STRING + sinkType + ".", true); + confIcebergSubset1.forEach(configCombined::put); + // debezium is doing config filtering before passing it down to this class! so we are taking unfiltered configs! + Map confIcebergSubset2 = ConsumerUtil.getConfigSubset(ConfigProvider.getConfig(), "debezium.sink." + sinkType + "."); + confIcebergSubset2.forEach(configCombined::putIfAbsent); } - private String getConfig(String configName, String fallbackConfigName, String defaultValue) { - return this.config.getString(configName, this.config.getString(fallbackConfigName, defaultValue)); - } public String getBigqueryProject() { - return getConfig(CONFIGURATION_FIELD_PREFIX_STRING + "bigquerybatch.project", - CONFIGURATION_FIELD_PREFIX_STRING + "bigquerystream.project", null); + return (String) configCombined.getOrDefault("project", null); } public String getBigqueryDataset() { - return getConfig(CONFIGURATION_FIELD_PREFIX_STRING + "bigquerybatch.dataset", - CONFIGURATION_FIELD_PREFIX_STRING + "bigquerystream.dataset", null); + return (String) configCombined.getOrDefault("dataset", null); } public String getBigqueryTable() { - return getConfig(CONFIGURATION_FIELD_PREFIX_STRING + "bigquery.table-name", - CONFIGURATION_FIELD_PREFIX_STRING + "bigquerystream.table-name", "debezium_database_history_storage" - ); + return (String) configCombined.getOrDefault("bigquery.table-name", "debezium_database_history_storage"); } public String getMigrateHistoryFile() { - return this.getConfig(CONFIGURATION_FIELD_PREFIX_STRING + "bigquery.migrate-history-file", - CONFIGURATION_FIELD_PREFIX_STRING + "bigquerystream.migrate-history-file", ""); + return (String) configCombined.getOrDefault("bigquery.migrate-history-file", ""); } public String getBigqueryCredentialsFile() { - return this.getConfig(CONFIGURATION_FIELD_PREFIX_STRING + "bigquerybatch.credentials-file", - CONFIGURATION_FIELD_PREFIX_STRING + "bigquerystream.credentials-file", ""); + return (String) configCombined.getOrDefault("credentials-file", ""); } public String getBigqueryLocation() { - return this.getConfig(CONFIGURATION_FIELD_PREFIX_STRING + "bigquerybatch.location", - CONFIGURATION_FIELD_PREFIX_STRING + "bigquerystream.location", "US"); + return (String) configCombined.getOrDefault("location", "US"); } } diff --git a/debezium-server-bigquery-sinks/src/main/java/io/debezium/server/bigquery/offset/BigqueryOffsetBackingStore.java b/debezium-server-bigquery-sinks/src/main/java/io/debezium/server/bigquery/offset/BigqueryOffsetBackingStore.java index 40f1e41..d9f96c9 100644 --- a/debezium-server-bigquery-sinks/src/main/java/io/debezium/server/bigquery/offset/BigqueryOffsetBackingStore.java +++ b/debezium-server-bigquery-sinks/src/main/java/io/debezium/server/bigquery/offset/BigqueryOffsetBackingStore.java @@ -8,27 +8,16 @@ package io.debezium.server.bigquery.offset; -import io.debezium.DebeziumException; -import io.debezium.config.Configuration; -import io.debezium.config.Field; -import io.debezium.server.bigquery.BatchUtil; -import io.debezium.util.Strings; - -import java.io.File; -import java.io.IOException; -import java.nio.ByteBuffer; -import java.nio.charset.StandardCharsets; -import java.nio.file.Files; -import java.sql.SQLException; -import java.sql.Timestamp; -import java.util.*; -import java.util.concurrent.Future; - import autovalue.shaded.com.google.common.collect.ImmutableList; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.cloud.bigquery.*; +import io.debezium.DebeziumException; +import io.debezium.config.Configuration; +import io.debezium.config.Field; +import io.debezium.server.bigquery.ConsumerUtil; +import io.debezium.util.Strings; import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.connect.errors.ConnectException; import org.apache.kafka.connect.runtime.WorkerConfig; @@ -36,9 +25,20 @@ import org.apache.kafka.connect.storage.OffsetBackingStore; import org.apache.kafka.connect.util.Callback; import org.apache.kafka.connect.util.SafeObjectInputStream; +import org.eclipse.microprofile.config.ConfigProvider; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.File; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.sql.SQLException; +import java.sql.Timestamp; +import java.util.*; +import java.util.concurrent.Future; + /** * Implementation of OffsetBackingStore that saves data to database table. */ @@ -79,7 +79,7 @@ public void configure(WorkerConfig config) { this.offsetConfig = new BigqueryOffsetBackingStoreConfig(Configuration.from(config.originalsStrings())); try { - bqClient = BatchUtil.getBQClient( + bqClient = ConsumerUtil.bigqueryClient( Optional.ofNullable(this.offsetConfig.getBigqueryProject()), Optional.ofNullable(this.offsetConfig.getBigqueryDataset()), Optional.ofNullable(this.offsetConfig.getBigqueryCredentialsFile()), @@ -110,10 +110,10 @@ private void initializeTable() throws SQLException { Table table = bqClient.getTable(tableId); if (table == null) { LOG.debug("Creating table {} to store offset", tableFullName); - BatchUtil.executeQuery(bqClient, String.format(OFFSET_STORAGE_TABLE_DDL, tableFullName)); + ConsumerUtil.executeQuery(bqClient, String.format(OFFSET_STORAGE_TABLE_DDL, tableFullName)); LOG.warn("Created offset storage table {} to store offset", tableFullName); - - if (!Strings.isNullOrEmpty(offsetConfig.getMigrateOffsetFile().strip())){ + + if (!Strings.isNullOrEmpty(offsetConfig.getMigrateOffsetFile().strip())) { LOG.warn("Loading offset from file {}", offsetConfig.getMigrateOffsetFile()); this.loadFileOffset(new File(offsetConfig.getMigrateOffsetFile())); } @@ -123,11 +123,11 @@ private void initializeTable() throws SQLException { protected void save() { LOG.debug("Saving offset data to bigquery table..."); try { - BatchUtil.executeQuery(bqClient, String.format(OFFSET_STORAGE_TABLE_DELETE, tableFullName)); + ConsumerUtil.executeQuery(bqClient, String.format(OFFSET_STORAGE_TABLE_DELETE, tableFullName)); String dataJson = mapper.writeValueAsString(data); LOG.debug("Saving offset data {}", dataJson); Timestamp currentTs = new Timestamp(System.currentTimeMillis()); - BatchUtil.executeQuery(bqClient, + ConsumerUtil.executeQuery(bqClient, String.format(OFFSET_STORAGE_TABLE_INSERT, tableFullName), ImmutableList.of( QueryParameterValue.string(UUID.randomUUID().toString()), @@ -145,7 +145,7 @@ protected void save() { private void load() { try { String dataJsonString = null; - TableResult rs = BatchUtil.executeQuery(bqClient, String.format(OFFSET_STORAGE_TABLE_SELECT, tableFullName)); + TableResult rs = ConsumerUtil.executeQuery(bqClient, String.format(OFFSET_STORAGE_TABLE_SELECT, tableFullName)); for (FieldValueList row : rs.getValues()) { dataJsonString = row.get("offset_data").getStringValue(); break; @@ -166,10 +166,10 @@ private void load() { private void loadFileOffset(File file) { try (SafeObjectInputStream is = new SafeObjectInputStream(Files.newInputStream(file.toPath()))) { Object obj = is.readObject(); - + if (!(obj instanceof HashMap)) throw new ConnectException("Expected HashMap but found " + obj.getClass()); - + Map raw = (Map) obj; for (Map.Entry mapEntry : raw.entrySet()) { ByteBuffer key = (mapEntry.getKey() != null) ? ByteBuffer.wrap(mapEntry.getKey()) : null; @@ -179,10 +179,11 @@ private void loadFileOffset(File file) { } catch (IOException | ClassNotFoundException e) { throw new DebeziumException("Failed migrating offset from file", e); } - + LOG.warn("Loaded file offset, saving it to Bigquery offset storage"); save(); } + @Override public Future set(final Map values, final Callback callback) { @@ -225,47 +226,43 @@ public Set> connectorPartitions(String connectorName) { } public static class BigqueryOffsetBackingStoreConfig extends WorkerConfig { - private final Configuration config; + Properties configCombined = new Properties(); static final Field SINK_TYPE_FIELD = Field.create("debezium.sink.type").optional(); static final Field SINK_TYPE_FIELD_FALLBACK = Field.create("name").optional(); public BigqueryOffsetBackingStoreConfig(Configuration config) { super(new ConfigDef(), config.asMap()); - this.config = config; - } - - public String sinkType() { - String type = this.config.getString(SINK_TYPE_FIELD, this.config.getString(SINK_TYPE_FIELD_FALLBACK)); - if (type == null) { - throw new DebeziumException("The config property debezium.sink.type is required " + - "but it could not be found in any config source"); - } - return type; + String sinkType = ConsumerUtil.sinkType(config); + Configuration confIcebergSubset1 = config.subset(CONFIGURATION_FIELD_PREFIX_STRING + sinkType + ".", true); + confIcebergSubset1.forEach(configCombined::put); + // debezium is doing config filtering before passing it down to this class! so we are taking unfiltered configs! + Map confIcebergSubset2 = ConsumerUtil.getConfigSubset(ConfigProvider.getConfig(), "debezium.sink." + sinkType + "."); + confIcebergSubset2.forEach(configCombined::putIfAbsent); } public String getBigqueryProject() { - return this.config.getString(Field.create(String.format(CONFIGURATION_FIELD_PREFIX_STRING + "%s.project", this.sinkType()))); + return (String) configCombined.getOrDefault("project", null); } public String getBigqueryDataset() { - return this.config.getString(Field.create(String.format(CONFIGURATION_FIELD_PREFIX_STRING + "%s.dataset", this.sinkType()))); + return (String) configCombined.getOrDefault("dataset", null); } public String getBigqueryTable() { - return this.config.getString(Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "bigquery.table-name").withDefault("debezium_offset_storage")); + return (String) configCombined.getOrDefault("bigquery.table-name", "debezium_offset_storage"); } public String getMigrateOffsetFile() { - return this.config.getString(Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "bigquery.migrate-offset-file").withDefault("")); + return (String) configCombined.getOrDefault("bigquery.migrate-offset-file", ""); } public String getBigqueryCredentialsFile() { - return this.config.getString(Field.create(String.format(CONFIGURATION_FIELD_PREFIX_STRING + "%s.credentialsFile", this.sinkType())).withDefault("")); + return (String) configCombined.getOrDefault("credentials-file", ""); } public String getBigqueryLocation() { - return this.config.getString(Field.create(String.format(CONFIGURATION_FIELD_PREFIX_STRING + "%s.location", this.sinkType())).withDefault("US")); + return (String) configCombined.getOrDefault("location", "US"); } } diff --git a/debezium-server-bigquery-sinks/src/test/java/io/debezium/server/bigquery/BatchUtilTest.java b/debezium-server-bigquery-sinks/src/test/java/io/debezium/server/bigquery/ConsumerUtilTest.java similarity index 96% rename from debezium-server-bigquery-sinks/src/test/java/io/debezium/server/bigquery/BatchUtilTest.java rename to debezium-server-bigquery-sinks/src/test/java/io/debezium/server/bigquery/ConsumerUtilTest.java index 6850b9e..65fba4d 100644 --- a/debezium-server-bigquery-sinks/src/test/java/io/debezium/server/bigquery/BatchUtilTest.java +++ b/debezium-server-bigquery-sinks/src/test/java/io/debezium/server/bigquery/ConsumerUtilTest.java @@ -8,24 +8,24 @@ package io.debezium.server.bigquery; +import com.fasterxml.jackson.databind.JsonNode; import io.debezium.serde.DebeziumSerdes; +import org.apache.kafka.common.serialization.Serde; +import org.junit.jupiter.api.Test; import java.io.IOException; import java.nio.file.Files; import java.nio.file.Path; import java.util.Collections; -import com.fasterxml.jackson.databind.JsonNode; -import org.apache.kafka.common.serialization.Serde; -import org.junit.jupiter.api.Test; import static org.junit.jupiter.api.Assertions.*; -class BatchUtilTest { +class ConsumerUtilTest { final String serdeWithSchema = Files.readString(Path.of("src/test/resources/json/serde-with-schema.json")); final String unwrapWithSchema = Files.readString(Path.of("src/test/resources/json/unwrap-with-schema.json")); - BatchUtilTest() throws IOException { + ConsumerUtilTest() throws IOException { } @Test