Skip to content

Commit

Permalink
chore: Add E2E test for Flexible Column Names with JsonStreamWriter (#…
Browse files Browse the repository at this point in the history
…2500)

* Add E2E test for Flexible Column Names with JsonStreamWriter

* Add E2E test for Flexible Column Names with JsonStreamWriter

* adjust the format

* Fix schema update test and remove comments
  • Loading branch information
tracyz-g authored May 15, 2024
1 parent b1b62b1 commit 05ebe17
Showing 1 changed file with 263 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1244,6 +1244,269 @@ public void testJsonStreamWriterSchemaUpdateConcurrent()
}
}

@Test
public void testJsonStreamWriterWithFlexibleColumnName()
throws IOException, InterruptedException, ExecutionException,
Descriptors.DescriptorValidationException {
String tableName = "FlexibleColumnTable";
TableInfo tableInfo =
TableInfo.newBuilder(
TableId.of(DATASET, tableName),
StandardTableDefinition.of(
Schema.of(
com.google.cloud.bigquery.Field.newBuilder(
"test-str列", StandardSQLTypeName.STRING)
.build(),
com.google.cloud.bigquery.Field.newBuilder(
"test-numerics列", StandardSQLTypeName.NUMERIC)
.setMode(Field.Mode.REPEATED)
.build(),
com.google.cloud.bigquery.Field.newBuilder(
"test-datetime列", StandardSQLTypeName.DATETIME)
.build())))
.build();
bigquery.create(tableInfo);
TableName parent = TableName.of(ServiceOptions.getDefaultProjectId(), DATASET, tableName);
WriteStream writeStream =
client.createWriteStream(
CreateWriteStreamRequest.newBuilder()
.setParent(parent.toString())
.setWriteStream(
WriteStream.newBuilder().setType(WriteStream.Type.COMMITTED).build())
.build());
try (JsonStreamWriter jsonStreamWriter =
JsonStreamWriter.newBuilder(writeStream.getName(), writeStream.getTableSchema()).build()) {
LOG.info("Sending one message");
JSONObject row1 = new JSONObject();
row1.put("test-str列", "aaa");
row1.put(
"test-numerics列",
new JSONArray(
new byte[][] {
BigDecimalByteStringEncoder.encodeToNumericByteString(new BigDecimal("123.4"))
.toByteArray(),
BigDecimalByteStringEncoder.encodeToNumericByteString(new BigDecimal("-9000000"))
.toByteArray()
}));
row1.put(
"test-datetime列",
CivilTimeEncoder.encodePacked64DatetimeMicros(LocalDateTime.of(2020, 10, 1, 12, 0)));
JSONArray jsonArr1 = new JSONArray(new JSONObject[] {row1});

ApiFuture<AppendRowsResponse> response1 = jsonStreamWriter.append(jsonArr1, -1);

assertEquals(0, response1.get().getAppendResult().getOffset().getValue());

JSONObject row2 = new JSONObject();
row2.put("test-str列", "bbb");
JSONObject row3 = new JSONObject();
row3.put("test-str列", "ccc");
JSONArray jsonArr2 = new JSONArray();
jsonArr2.put(row2);
jsonArr2.put(row3);

JSONObject row4 = new JSONObject();
row4.put("test-str列", "ddd");
JSONArray jsonArr3 = new JSONArray();
jsonArr3.put(row4);

LOG.info("Sending two more messages");
ApiFuture<AppendRowsResponse> response2 = jsonStreamWriter.append(jsonArr2, -1);
LOG.info("Sending one more message");
ApiFuture<AppendRowsResponse> response3 = jsonStreamWriter.append(jsonArr3, -1);
assertEquals(1, response2.get().getAppendResult().getOffset().getValue());
assertEquals(3, response3.get().getAppendResult().getOffset().getValue());

TableResult result =
bigquery.listTableData(
tableInfo.getTableId(), BigQuery.TableDataListOption.startIndex(0L));
Iterator<FieldValueList> iter = result.getValues().iterator();
FieldValueList currentRow = iter.next();
assertEquals("aaa", currentRow.get(0).getStringValue());
assertEquals("-9000000", currentRow.get(1).getRepeatedValue().get(1).getStringValue());
assertEquals("2020-10-01T12:00:00", currentRow.get(2).getStringValue());
assertEquals("bbb", iter.next().get(0).getStringValue());
assertEquals("ccc", iter.next().get(0).getStringValue());
assertEquals("ddd", iter.next().get(0).getStringValue());
assertEquals(false, iter.hasNext());
}
}

@Test
public void testJsonStreamWriterWithNestedFlexibleColumnName()
throws IOException, InterruptedException, ExecutionException,
Descriptors.DescriptorValidationException {
String tableName = "NestedFlexibleColumnTable";
TableInfo tableInfo =
TableInfo.newBuilder(
TableId.of(DATASET, tableName),
StandardTableDefinition.of(
Schema.of(
com.google.cloud.bigquery.Field.newBuilder(
"test-str列", StandardSQLTypeName.STRING)
.build(),
com.google.cloud.bigquery.Field.newBuilder(
"test-record列",
StandardSQLTypeName.STRUCT,
com.google.cloud.bigquery.Field.of(
"nested-str列", StandardSQLTypeName.STRING),
com.google.cloud.bigquery.Field.of(
"nested-int列", StandardSQLTypeName.INT64))
.setMode(Field.Mode.REPEATED)
.build())))
.build();
bigquery.create(tableInfo);
TableName parent = TableName.of(ServiceOptions.getDefaultProjectId(), DATASET, tableName);
WriteStream writeStream =
client.createWriteStream(
CreateWriteStreamRequest.newBuilder()
.setParent(parent.toString())
.setWriteStream(
WriteStream.newBuilder().setType(WriteStream.Type.COMMITTED).build())
.build());
try (JsonStreamWriter jsonStreamWriter =
JsonStreamWriter.newBuilder(writeStream.getName(), writeStream.getTableSchema()).build()) {
LOG.info("Sending one message");
JSONObject row1 = new JSONObject();
row1.put("test-str列", "aaa");
JSONObject record1 = new JSONObject();
record1.put("nested-str列", "nested-str1");
record1.put("nested-int列", 10);
row1.put("test-record列", new JSONArray(new JSONObject[] {record1}));
JSONArray jsonArr1 = new JSONArray(new JSONObject[] {row1});

ApiFuture<AppendRowsResponse> response1 = jsonStreamWriter.append(jsonArr1, -1);

assertEquals(0, response1.get().getAppendResult().getOffset().getValue());

JSONObject row2 = new JSONObject();
row2.put("test-str列", "bbb");
JSONObject row3 = new JSONObject();
row3.put("test-str列", "ccc");
JSONArray jsonArr2 = new JSONArray();
jsonArr2.put(row2);
jsonArr2.put(row3);

JSONObject row4 = new JSONObject();
row4.put("test-str列", "ddd");
JSONObject record2 = new JSONObject();
record2.put("nested-str列", "nested-str2");
record2.put("nested-int列", 20);
row4.put("test-record列", new JSONArray(new JSONObject[] {record2}));
JSONArray jsonArr3 = new JSONArray();
jsonArr3.put(row4);

LOG.info("Sending two more messages");
ApiFuture<AppendRowsResponse> response2 = jsonStreamWriter.append(jsonArr2, -1);
LOG.info("Sending one more message");
ApiFuture<AppendRowsResponse> response3 = jsonStreamWriter.append(jsonArr3, -1);
assertEquals(1, response2.get().getAppendResult().getOffset().getValue());
assertEquals(3, response3.get().getAppendResult().getOffset().getValue());

TableResult result =
bigquery.listTableData(
tableInfo.getTableId(), BigQuery.TableDataListOption.startIndex(0L));
Iterator<FieldValueList> iter = result.getValues().iterator();
FieldValueList currentRow = iter.next();
assertEquals("aaa", currentRow.get(0).getStringValue());
FieldValueList currentRecord = currentRow.get(1).getRepeatedValue().get(0).getRecordValue();
assertEquals("nested-str1", currentRecord.get(0).getStringValue());
assertEquals("10", currentRecord.get(1).getStringValue());
assertEquals("bbb", iter.next().get(0).getStringValue());
assertEquals("ccc", iter.next().get(0).getStringValue());
FieldValueList lastRow = iter.next();
assertEquals("ddd", lastRow.get(0).getStringValue());
FieldValueList lastRecord = lastRow.get(1).getRepeatedValue().get(0).getRecordValue();
assertEquals("nested-str2", lastRecord.get(0).getStringValue());
assertEquals("20", lastRecord.get(1).getStringValue());
assertEquals(false, iter.hasNext());
}
}

@Test
public void testJsonStreamWriterSchemaUpdateWithFlexibleColumnName()
throws DescriptorValidationException, IOException, InterruptedException, ExecutionException {
String tableName = "SchemaUpdateFlexColumnTestTable";
TableId tableId = TableId.of(DATASET, tableName);
Field col1 = Field.newBuilder("col1-列", StandardSQLTypeName.STRING).build();
Schema originalSchema = Schema.of(col1);
TableInfo tableInfo =
TableInfo.newBuilder(tableId, StandardTableDefinition.of(originalSchema)).build();
bigquery.create(tableInfo);
TableName parent = TableName.of(ServiceOptions.getDefaultProjectId(), DATASET, tableName);
WriteStream writeStream =
client.createWriteStream(
CreateWriteStreamRequest.newBuilder()
.setParent(parent.toString())
.setWriteStream(
WriteStream.newBuilder().setType(WriteStream.Type.COMMITTED).build())
.build());
try (JsonStreamWriter jsonStreamWriter =
JsonStreamWriter.newBuilder(writeStream.getName(), client).build()) {
// write the 1st row
JSONObject foo = new JSONObject();
foo.put("col1-列", "aaa");
JSONArray jsonArr = new JSONArray();
jsonArr.put(foo);
ApiFuture<AppendRowsResponse> response = jsonStreamWriter.append(jsonArr, 0);
assertEquals(0, response.get().getAppendResult().getOffset().getValue());

// update schema with a new column
Field col2 = Field.newBuilder("col2-列", StandardSQLTypeName.STRING).build();
Schema updatedSchema = Schema.of(ImmutableList.of(col1, col2));
TableInfo updatedTableInfo =
TableInfo.newBuilder(tableId, StandardTableDefinition.of(updatedSchema)).build();
Table updatedTable = bigquery.update(updatedTableInfo);
assertEquals(updatedSchema, updatedTable.getDefinition().getSchema());

// continue writing rows until backend acknowledges schema update
JSONObject foo2 = new JSONObject();
foo2.put("col1-列", "bbb");
JSONArray jsonArr2 = new JSONArray();
jsonArr2.put(foo2);

int next = 0;
for (int i = 1; i < 100; i++) {
ApiFuture<AppendRowsResponse> response2 = jsonStreamWriter.append(jsonArr2, i);
assertEquals(i, response2.get().getAppendResult().getOffset().getValue());
if (response2.get().hasUpdatedSchema()) {
next = i;
break;
} else {
Thread.sleep(1000);
}
}

// write rows with updated schema.
JSONObject updatedFoo = new JSONObject();
updatedFoo.put("col1-列", "ccc");
updatedFoo.put("col2-列", "ddd");
JSONArray updatedJsonArr = new JSONArray();
updatedJsonArr.put(updatedFoo);
for (int i = 0; i < 10; i++) {
ApiFuture<AppendRowsResponse> response3 =
jsonStreamWriter.append(updatedJsonArr, next + 1 + i);
assertEquals(next + 1 + i, response3.get().getAppendResult().getOffset().getValue());
}

// verify table data correctness
Iterator<FieldValueList> rowsIter = bigquery.listTableData(tableId).getValues().iterator();
// 1 row of aaa
assertEquals("aaa", rowsIter.next().get(0).getStringValue());
// a few rows of bbb
for (int j = 1; j <= next; j++) {
assertEquals("bbb", rowsIter.next().get(0).getStringValue());
}
// 10 rows of ccc, ddd
for (int j = next + 1; j < next + 1 + 10; j++) {
FieldValueList temp = rowsIter.next();
assertEquals("ccc", temp.get(0).getStringValue());
assertEquals("ddd", temp.get(1).getStringValue());
}
assertFalse(rowsIter.hasNext());
}
}

@Test
public void testComplicateSchemaWithPendingStream()
throws IOException, InterruptedException, ExecutionException {
Expand Down

0 comments on commit 05ebe17

Please sign in to comment.