diff --git a/debezium-server-bigquery-sinks/src/main/java/io/debezium/server/bigquery/BatchBigqueryChangeConsumer.java b/debezium-server-bigquery-sinks/src/main/java/io/debezium/server/bigquery/BatchBigqueryChangeConsumer.java index 67cbf16..42f8c9d 100644 --- a/debezium-server-bigquery-sinks/src/main/java/io/debezium/server/bigquery/BatchBigqueryChangeConsumer.java +++ b/debezium-server-bigquery-sinks/src/main/java/io/debezium/server/bigquery/BatchBigqueryChangeConsumer.java @@ -99,7 +99,7 @@ public long uploadDestination(String destination, List data) { TableId tableId = getTableId(destination); RecordConverter sampleEvent = data.get(0); - Schema schema = sampleEvent.tableSchema(false, false); + Schema schema = sampleEvent.tableSchema(false); if (schema == null) { schema = bqClient.getTable(tableId).getDefinition().getSchema(); } diff --git a/debezium-server-bigquery-sinks/src/main/java/io/debezium/server/bigquery/RecordConverter.java b/debezium-server-bigquery-sinks/src/main/java/io/debezium/server/bigquery/RecordConverter.java index 120100c..3de002c 100644 --- a/debezium-server-bigquery-sinks/src/main/java/io/debezium/server/bigquery/RecordConverter.java +++ b/debezium-server-bigquery-sinks/src/main/java/io/debezium/server/bigquery/RecordConverter.java @@ -51,8 +51,7 @@ public RecordConverter(String destination, JsonNode value, JsonNode key, JsonNod this.keySchema = keySchema; } - private static ArrayList schemaFields(JsonNode schemaNode, Boolean binaryAsString, - boolean isStream) { + private static ArrayList schemaFields(JsonNode schemaNode, Boolean binaryAsString) { ArrayList fields = new ArrayList<>(); @@ -79,12 +78,12 @@ private static ArrayList schemaFields(JsonNode schemaNode, Boolean binary switch (fieldType) { case "struct": // recursive call for nested fields - ArrayList subFields = schemaFields(jsonSchemaFieldNode, binaryAsString, isStream); + ArrayList subFields = schemaFields(jsonSchemaFieldNode, binaryAsString); fields.add(Field.newBuilder(fieldName, StandardSQLTypeName.STRUCT, FieldList.of(subFields)).build()); break; default: // default to String type - fields.add(schemaPrimitiveField(fieldType, fieldName, fieldSemanticType, binaryAsString, isStream)); + fields.add(schemaPrimitiveField(fieldType, fieldName, fieldSemanticType, binaryAsString)); break; } } @@ -92,7 +91,7 @@ private static ArrayList schemaFields(JsonNode schemaNode, Boolean binary return fields; } - private static Field schemaPrimitiveField(String fieldType, String fieldName, String fieldSemanticType, boolean binaryAsString, boolean isStream) { + private static Field schemaPrimitiveField(String fieldType, String fieldName, String fieldSemanticType, boolean binaryAsString) { switch (fieldType) { case "int8": case "int16": @@ -292,8 +291,8 @@ public Clustering tableClustering(String clusteringField) { } } - public Schema tableSchema(Boolean binaryAsString, boolean isStream) { - ArrayList fields = schemaFields(this.valueSchema(), binaryAsString, isStream); + public Schema tableSchema(Boolean binaryAsString) { + ArrayList fields = schemaFields(this.valueSchema(), binaryAsString); if (fields.isEmpty()) { return null; diff --git a/debezium-server-bigquery-sinks/src/main/java/io/debezium/server/bigquery/StreamBigqueryChangeConsumer.java b/debezium-server-bigquery-sinks/src/main/java/io/debezium/server/bigquery/StreamBigqueryChangeConsumer.java index 247dc6b..85eaef9 100644 --- a/debezium-server-bigquery-sinks/src/main/java/io/debezium/server/bigquery/StreamBigqueryChangeConsumer.java +++ b/debezium-server-bigquery-sinks/src/main/java/io/debezium/server/bigquery/StreamBigqueryChangeConsumer.java @@ -250,7 +250,7 @@ private Table getTable(String destination, RecordConverter sampleBqEvent) { // create table if missing if (createIfNeeded && table == null) { table = this.createTable(tableId, - sampleBqEvent.tableSchema(true, true), + sampleBqEvent.tableSchema(true), sampleBqEvent.tableClustering(clusteringField), sampleBqEvent.tableConstraints() ); @@ -258,7 +258,7 @@ private Table getTable(String destination, RecordConverter sampleBqEvent) { // alter table schema add new fields if (allowFieldAddition && table != null) { - table = this.updateTableSchema(table, sampleBqEvent.tableSchema(true, true), destination); + table = this.updateTableSchema(table, sampleBqEvent.tableSchema(true), destination); } return table; }