Skip to content

Commit

Permalink
Improve code naming
Browse files Browse the repository at this point in the history
  • Loading branch information
ismailsimsek committed Sep 23, 2024
1 parent 57c7ca4 commit a800cfd
Show file tree
Hide file tree
Showing 3 changed files with 9 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ public long uploadDestination(String destination, List<RecordConverter> data) {
TableId tableId = getTableId(destination);

RecordConverter sampleEvent = data.get(0);
Schema schema = sampleEvent.tableSchema(false, false);
Schema schema = sampleEvent.tableSchema(false);
if (schema == null) {
schema = bqClient.getTable(tableId).getDefinition().getSchema();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,7 @@ public RecordConverter(String destination, JsonNode value, JsonNode key, JsonNod
this.keySchema = keySchema;
}

private static ArrayList<Field> schemaFields(JsonNode schemaNode, Boolean binaryAsString,
boolean isStream) {
private static ArrayList<Field> schemaFields(JsonNode schemaNode, Boolean binaryAsString) {

ArrayList<Field> fields = new ArrayList<>();

Expand All @@ -79,20 +78,20 @@ private static ArrayList<Field> schemaFields(JsonNode schemaNode, Boolean binary
switch (fieldType) {
case "struct":
// recursive call for nested fields
ArrayList<Field> subFields = schemaFields(jsonSchemaFieldNode, binaryAsString, isStream);
ArrayList<Field> subFields = schemaFields(jsonSchemaFieldNode, binaryAsString);
fields.add(Field.newBuilder(fieldName, StandardSQLTypeName.STRUCT, FieldList.of(subFields)).build());
break;
default:
// default to String type
fields.add(schemaPrimitiveField(fieldType, fieldName, fieldSemanticType, binaryAsString, isStream));
fields.add(schemaPrimitiveField(fieldType, fieldName, fieldSemanticType, binaryAsString));
break;
}
}

return fields;
}

private static Field schemaPrimitiveField(String fieldType, String fieldName, String fieldSemanticType, boolean binaryAsString, boolean isStream) {
private static Field schemaPrimitiveField(String fieldType, String fieldName, String fieldSemanticType, boolean binaryAsString) {
switch (fieldType) {
case "int8":
case "int16":
Expand Down Expand Up @@ -292,8 +291,8 @@ public Clustering tableClustering(String clusteringField) {
}
}

public Schema tableSchema(Boolean binaryAsString, boolean isStream) {
ArrayList<Field> fields = schemaFields(this.valueSchema(), binaryAsString, isStream);
public Schema tableSchema(Boolean binaryAsString) {
ArrayList<Field> fields = schemaFields(this.valueSchema(), binaryAsString);

if (fields.isEmpty()) {
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -250,15 +250,15 @@ private Table getTable(String destination, RecordConverter sampleBqEvent) {
// create table if missing
if (createIfNeeded && table == null) {
table = this.createTable(tableId,
sampleBqEvent.tableSchema(true, true),
sampleBqEvent.tableSchema(true),
sampleBqEvent.tableClustering(clusteringField),
sampleBqEvent.tableConstraints()
);
}

// alter table schema add new fields
if (allowFieldAddition && table != null) {
table = this.updateTableSchema(table, sampleBqEvent.tableSchema(true, true), destination);
table = this.updateTableSchema(table, sampleBqEvent.tableSchema(true), destination);
}
return table;
}
Expand Down

0 comments on commit a800cfd

Please sign in to comment.