Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve BigquerySchemaHistory, BigqueryOffsetBackingStore classes #192

Merged
merged 3 commits into from
Sep 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ public void initizalize() throws InterruptedException {
throw new InterruptedException("debezium.format.key={" + valueFormat + "} not supported! Supported (debezium.format.key=*) formats are {json,}!");
}

batchSizeWait = BatchUtil.selectInstance(batchSizeWaitInstances, batchSizeWaitName);
batchSizeWait = ConsumerUtil.selectInstance(batchSizeWaitInstances, batchSizeWaitName);
LOGGER.info("Using {} to optimize batch size", batchSizeWait.getClass().getSimpleName());
batchSizeWait.initizalize();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ void connect() throws InterruptedException {

public void initizalize() throws InterruptedException {
super.initizalize();
bqClient = BatchUtil.getBQClient(gcpProject, bqDataset, credentialsFile , bqLocation);
bqClient = ConsumerUtil.bigqueryClient(gcpProject, bqDataset, credentialsFile, bqLocation);
timePartitioning =
TimePartitioning.newBuilder(TimePartitioning.Type.valueOf(partitionType)).setField(partitionField).build();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,17 @@

package io.debezium.server.bigquery;

import com.google.api.gax.retrying.RetrySettings;
import com.google.auth.oauth2.GoogleCredentials;
import com.google.cloud.bigquery.*;
import io.debezium.DebeziumException;
import io.debezium.config.Configuration;
import io.debezium.config.Field;
import jakarta.enterprise.inject.Instance;
import jakarta.enterprise.inject.literal.NamedLiteral;
import org.eclipse.microprofile.config.Config;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.FileInputStream;
import java.io.IOException;
Expand All @@ -18,21 +28,15 @@
import java.util.Map;
import java.util.Optional;

import com.google.api.gax.retrying.RetrySettings;
import com.google.auth.oauth2.GoogleCredentials;
import com.google.cloud.bigquery.*;
import jakarta.enterprise.inject.Instance;
import jakarta.enterprise.inject.literal.NamedLiteral;
import org.eclipse.microprofile.config.Config;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
*
* @author Ismail Simsek
*/
public class BatchUtil {
protected static final Logger LOGGER = LoggerFactory.getLogger(BatchUtil.class);
public class ConsumerUtil {
protected static final Logger LOGGER = LoggerFactory.getLogger(ConsumerUtil.class);

static final io.debezium.config.Field SINK_TYPE_FIELD = io.debezium.config.Field.create("debezium.sink.type").optional();
static final io.debezium.config.Field SINK_TYPE_FIELD_FALLBACK = Field.create("name").optional();

public static Map<String, String> getConfigSubset(Config config, String prefix) {
final Map<String, String> ret = new HashMap<>();
Expand All @@ -47,6 +51,15 @@ public static Map<String, String> getConfigSubset(Config config, String prefix)
return ret;
}


public static String sinkType(Configuration config) {
String type = config.getString(SINK_TYPE_FIELD, config.getString(SINK_TYPE_FIELD_FALLBACK));
if (type == null) {
throw new DebeziumException("The config property debezium.sink.type is required " + "but it could not be found in any config source");
}
return type;
}

public static <T> T selectInstance(Instance<T> instances, String name) {

Instance<T> instance = instances.select(NamedLiteral.of(name));
Expand All @@ -60,7 +73,7 @@ public static <T> T selectInstance(Instance<T> instances, String name) {
return instance.get();
}

public static BigQuery getBQClient(Optional<String> gcpProject, Optional<String> bqDataset, Optional<String> credentialsFile, String bqLocation) throws InterruptedException {
public static BigQuery bigqueryClient(Optional<String> gcpProject, Optional<String> bqDataset, Optional<String> credentialsFile, String bqLocation) throws InterruptedException {

if (gcpProject.isEmpty()) {
throw new InterruptedException("Please provide a value for `debezium.sink.{bigquerybatch|bigquerystream}.project`");
Expand Down Expand Up @@ -116,7 +129,7 @@ public static TableResult executeQuery(BigQuery bqClient, String query, List<Que
}

public static TableResult executeQuery(BigQuery bqClient, String query) throws SQLException {
return BatchUtil.executeQuery(bqClient, query, null);
return ConsumerUtil.executeQuery(bqClient, query, null);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ void closeStreams() {
public void initizalize() throws InterruptedException {
super.initizalize();

bqClient = BatchUtil.getBQClient(gcpProject, bqDataset, credentialsFile, bqLocation);
bqClient = ConsumerUtil.bigqueryClient(gcpProject, bqDataset, credentialsFile, bqLocation);

timePartitioning =
TimePartitioning.newBuilder(TimePartitioning.Type.valueOf(partitionType)).setField(partitionField).build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,34 +8,36 @@

package io.debezium.server.bigquery.history;

import autovalue.shaded.com.google.common.collect.ImmutableList;
import com.google.cloud.bigquery.*;
import io.debezium.DebeziumException;
import io.debezium.annotation.ThreadSafe;
import io.debezium.common.annotation.Incubating;
import io.debezium.config.Configuration;
import io.debezium.document.DocumentReader;
import io.debezium.document.DocumentWriter;
import io.debezium.relational.history.*;
import io.debezium.server.bigquery.BatchUtil;
import io.debezium.server.bigquery.ConsumerUtil;
import io.debezium.util.FunctionalReadWriteLock;
import io.debezium.util.Strings;
import org.eclipse.microprofile.config.ConfigProvider;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.BufferedReader;
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.sql.SQLException;
import java.sql.Timestamp;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;

import autovalue.shaded.com.google.common.collect.ImmutableList;
import com.google.cloud.bigquery.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* A {@link SchemaHistory} implementation that stores the schema history to database table
*
Expand Down Expand Up @@ -71,7 +73,7 @@ public void configure(Configuration config, HistoryRecordComparator comparator,
super.configure(config, comparator, listener, useCatalogBeforeSchema);
this.historyConfig = new BigquerySchemaHistoryConfig(config);
try {
bqClient = BatchUtil.getBQClient(
bqClient = ConsumerUtil.bigqueryClient(
Optional.ofNullable(this.historyConfig.getBigqueryProject()),
Optional.ofNullable(this.historyConfig.getBigqueryDataset()),
Optional.ofNullable(this.historyConfig.getBigqueryCredentialsFile()),
Expand Down Expand Up @@ -122,7 +124,7 @@ protected void storeRecord(HistoryRecord record) throws SchemaHistoryException {
String recordDocString = writer.write(record.document());
LOG.trace("Saving history data {}", recordDocString);
Timestamp currentTs = new Timestamp(System.currentTimeMillis());
BatchUtil.executeQuery(bqClient,
ConsumerUtil.executeQuery(bqClient,
String.format(DATABASE_HISTORY_STORAGE_TABLE_INSERT, tableFullName),
ImmutableList.of(
QueryParameterValue.string(UUID.randomUUID().toString()),
Expand All @@ -148,7 +150,7 @@ protected synchronized void recoverRecords(Consumer<HistoryRecord> records) {
lock.write(() -> {
try {
if (exists()) {
TableResult rs = BatchUtil.executeQuery(bqClient, String.format(DATABASE_HISTORY_STORAGE_TABLE_SELECT, tableFullName));
TableResult rs = ConsumerUtil.executeQuery(bqClient, String.format(DATABASE_HISTORY_STORAGE_TABLE_SELECT, tableFullName));
for (FieldValueList row : rs.getValues()) {
String line = row.get("history_data").getStringValue();
if (line == null) {
Expand Down Expand Up @@ -180,7 +182,7 @@ public boolean exists() {

int numRows = 0;
try {
TableResult rs = BatchUtil.executeQuery(bqClient, "SELECT COUNT(*) as row_count FROM " + tableFullName);
TableResult rs = ConsumerUtil.executeQuery(bqClient, "SELECT COUNT(*) as row_count FROM " + tableFullName);
for (FieldValueList row : rs.getValues()) {
numRows = row.get("row_count").getNumericValue().intValue();
break;
Expand All @@ -201,7 +203,7 @@ public void initializeStorage() {
if (!storageExists()) {
try {
LOG.debug("Creating table {} to store database history", tableFullName);
BatchUtil.executeQuery(bqClient, String.format(DATABASE_HISTORY_TABLE_DDL, tableFullName));
ConsumerUtil.executeQuery(bqClient, String.format(DATABASE_HISTORY_TABLE_DDL, tableFullName));
LOG.warn("Created database history storage table {} to store history", tableFullName);

if (!Strings.isNullOrEmpty(historyConfig.getMigrateHistoryFile().strip())) {
Expand Down Expand Up @@ -241,45 +243,40 @@ private void loadFileSchemaHistory(File file) {
}

public static class BigquerySchemaHistoryConfig {
private final Configuration config;
Properties configCombined = new Properties();

public BigquerySchemaHistoryConfig(Configuration config) {
this.config = config;
String sinkType = ConsumerUtil.sinkType(config);
Configuration confIcebergSubset1 = config.subset(CONFIGURATION_FIELD_PREFIX_STRING + sinkType + ".", true);
confIcebergSubset1.forEach(configCombined::put);
// debezium is doing config filtering before passing it down to this class! so we are taking unfiltered configs!
Map<String, String> confIcebergSubset2 = ConsumerUtil.getConfigSubset(ConfigProvider.getConfig(), "debezium.sink." + sinkType + ".");
confIcebergSubset2.forEach(configCombined::putIfAbsent);
}

private String getConfig(String configName, String fallbackConfigName, String defaultValue) {
return this.config.getString(configName, this.config.getString(fallbackConfigName, defaultValue));
}

public String getBigqueryProject() {
return getConfig(CONFIGURATION_FIELD_PREFIX_STRING + "bigquerybatch.project",
CONFIGURATION_FIELD_PREFIX_STRING + "bigquerystream.project", null);
return (String) configCombined.getOrDefault("project", null);
}

public String getBigqueryDataset() {
return getConfig(CONFIGURATION_FIELD_PREFIX_STRING + "bigquerybatch.dataset",
CONFIGURATION_FIELD_PREFIX_STRING + "bigquerystream.dataset", null);
return (String) configCombined.getOrDefault("dataset", null);
}

public String getBigqueryTable() {
return getConfig(CONFIGURATION_FIELD_PREFIX_STRING + "bigquery.table-name",
CONFIGURATION_FIELD_PREFIX_STRING + "bigquerystream.table-name", "debezium_database_history_storage"
);
return (String) configCombined.getOrDefault("bigquery.table-name", "debezium_database_history_storage");
}

public String getMigrateHistoryFile() {
return this.getConfig(CONFIGURATION_FIELD_PREFIX_STRING + "bigquery.migrate-history-file",
CONFIGURATION_FIELD_PREFIX_STRING + "bigquerystream.migrate-history-file", "");
return (String) configCombined.getOrDefault("bigquery.migrate-history-file", "");
}

public String getBigqueryCredentialsFile() {
return this.getConfig(CONFIGURATION_FIELD_PREFIX_STRING + "bigquerybatch.credentials-file",
CONFIGURATION_FIELD_PREFIX_STRING + "bigquerystream.credentials-file", "");
return (String) configCombined.getOrDefault("credentials-file", "");
}

public String getBigqueryLocation() {
return this.getConfig(CONFIGURATION_FIELD_PREFIX_STRING + "bigquerybatch.location",
CONFIGURATION_FIELD_PREFIX_STRING + "bigquerystream.location", "US");
return (String) configCombined.getOrDefault("location", "US");
}
}

Expand Down
Loading
Loading