diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/ITBigQueryWriteManualClientTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/ITBigQueryWriteManualClientTest.java index 496b406cda..a1ab8c1dc4 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/ITBigQueryWriteManualClientTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/ITBigQueryWriteManualClientTest.java @@ -1244,6 +1244,269 @@ public void testJsonStreamWriterSchemaUpdateConcurrent() } } + @Test + public void testJsonStreamWriterWithFlexibleColumnName() + throws IOException, InterruptedException, ExecutionException, + Descriptors.DescriptorValidationException { + String tableName = "FlexibleColumnTable"; + TableInfo tableInfo = + TableInfo.newBuilder( + TableId.of(DATASET, tableName), + StandardTableDefinition.of( + Schema.of( + com.google.cloud.bigquery.Field.newBuilder( + "test-str列", StandardSQLTypeName.STRING) + .build(), + com.google.cloud.bigquery.Field.newBuilder( + "test-numerics列", StandardSQLTypeName.NUMERIC) + .setMode(Field.Mode.REPEATED) + .build(), + com.google.cloud.bigquery.Field.newBuilder( + "test-datetime列", StandardSQLTypeName.DATETIME) + .build()))) + .build(); + bigquery.create(tableInfo); + TableName parent = TableName.of(ServiceOptions.getDefaultProjectId(), DATASET, tableName); + WriteStream writeStream = + client.createWriteStream( + CreateWriteStreamRequest.newBuilder() + .setParent(parent.toString()) + .setWriteStream( + WriteStream.newBuilder().setType(WriteStream.Type.COMMITTED).build()) + .build()); + try (JsonStreamWriter jsonStreamWriter = + JsonStreamWriter.newBuilder(writeStream.getName(), writeStream.getTableSchema()).build()) { + LOG.info("Sending one message"); + JSONObject row1 = new JSONObject(); + row1.put("test-str列", "aaa"); + row1.put( + "test-numerics列", + new JSONArray( + new byte[][] { + BigDecimalByteStringEncoder.encodeToNumericByteString(new BigDecimal("123.4")) + .toByteArray(), + BigDecimalByteStringEncoder.encodeToNumericByteString(new BigDecimal("-9000000")) + .toByteArray() + })); + row1.put( + "test-datetime列", + CivilTimeEncoder.encodePacked64DatetimeMicros(LocalDateTime.of(2020, 10, 1, 12, 0))); + JSONArray jsonArr1 = new JSONArray(new JSONObject[] {row1}); + + ApiFuture response1 = jsonStreamWriter.append(jsonArr1, -1); + + assertEquals(0, response1.get().getAppendResult().getOffset().getValue()); + + JSONObject row2 = new JSONObject(); + row2.put("test-str列", "bbb"); + JSONObject row3 = new JSONObject(); + row3.put("test-str列", "ccc"); + JSONArray jsonArr2 = new JSONArray(); + jsonArr2.put(row2); + jsonArr2.put(row3); + + JSONObject row4 = new JSONObject(); + row4.put("test-str列", "ddd"); + JSONArray jsonArr3 = new JSONArray(); + jsonArr3.put(row4); + + LOG.info("Sending two more messages"); + ApiFuture response2 = jsonStreamWriter.append(jsonArr2, -1); + LOG.info("Sending one more message"); + ApiFuture response3 = jsonStreamWriter.append(jsonArr3, -1); + assertEquals(1, response2.get().getAppendResult().getOffset().getValue()); + assertEquals(3, response3.get().getAppendResult().getOffset().getValue()); + + TableResult result = + bigquery.listTableData( + tableInfo.getTableId(), BigQuery.TableDataListOption.startIndex(0L)); + Iterator iter = result.getValues().iterator(); + FieldValueList currentRow = iter.next(); + assertEquals("aaa", currentRow.get(0).getStringValue()); + assertEquals("-9000000", currentRow.get(1).getRepeatedValue().get(1).getStringValue()); + assertEquals("2020-10-01T12:00:00", currentRow.get(2).getStringValue()); + assertEquals("bbb", iter.next().get(0).getStringValue()); + assertEquals("ccc", iter.next().get(0).getStringValue()); + assertEquals("ddd", iter.next().get(0).getStringValue()); + assertEquals(false, iter.hasNext()); + } + } + + @Test + public void testJsonStreamWriterWithNestedFlexibleColumnName() + throws IOException, InterruptedException, ExecutionException, + Descriptors.DescriptorValidationException { + String tableName = "NestedFlexibleColumnTable"; + TableInfo tableInfo = + TableInfo.newBuilder( + TableId.of(DATASET, tableName), + StandardTableDefinition.of( + Schema.of( + com.google.cloud.bigquery.Field.newBuilder( + "test-str列", StandardSQLTypeName.STRING) + .build(), + com.google.cloud.bigquery.Field.newBuilder( + "test-record列", + StandardSQLTypeName.STRUCT, + com.google.cloud.bigquery.Field.of( + "nested-str列", StandardSQLTypeName.STRING), + com.google.cloud.bigquery.Field.of( + "nested-int列", StandardSQLTypeName.INT64)) + .setMode(Field.Mode.REPEATED) + .build()))) + .build(); + bigquery.create(tableInfo); + TableName parent = TableName.of(ServiceOptions.getDefaultProjectId(), DATASET, tableName); + WriteStream writeStream = + client.createWriteStream( + CreateWriteStreamRequest.newBuilder() + .setParent(parent.toString()) + .setWriteStream( + WriteStream.newBuilder().setType(WriteStream.Type.COMMITTED).build()) + .build()); + try (JsonStreamWriter jsonStreamWriter = + JsonStreamWriter.newBuilder(writeStream.getName(), writeStream.getTableSchema()).build()) { + LOG.info("Sending one message"); + JSONObject row1 = new JSONObject(); + row1.put("test-str列", "aaa"); + JSONObject record1 = new JSONObject(); + record1.put("nested-str列", "nested-str1"); + record1.put("nested-int列", 10); + row1.put("test-record列", new JSONArray(new JSONObject[] {record1})); + JSONArray jsonArr1 = new JSONArray(new JSONObject[] {row1}); + + ApiFuture response1 = jsonStreamWriter.append(jsonArr1, -1); + + assertEquals(0, response1.get().getAppendResult().getOffset().getValue()); + + JSONObject row2 = new JSONObject(); + row2.put("test-str列", "bbb"); + JSONObject row3 = new JSONObject(); + row3.put("test-str列", "ccc"); + JSONArray jsonArr2 = new JSONArray(); + jsonArr2.put(row2); + jsonArr2.put(row3); + + JSONObject row4 = new JSONObject(); + row4.put("test-str列", "ddd"); + JSONObject record2 = new JSONObject(); + record2.put("nested-str列", "nested-str2"); + record2.put("nested-int列", 20); + row4.put("test-record列", new JSONArray(new JSONObject[] {record2})); + JSONArray jsonArr3 = new JSONArray(); + jsonArr3.put(row4); + + LOG.info("Sending two more messages"); + ApiFuture response2 = jsonStreamWriter.append(jsonArr2, -1); + LOG.info("Sending one more message"); + ApiFuture response3 = jsonStreamWriter.append(jsonArr3, -1); + assertEquals(1, response2.get().getAppendResult().getOffset().getValue()); + assertEquals(3, response3.get().getAppendResult().getOffset().getValue()); + + TableResult result = + bigquery.listTableData( + tableInfo.getTableId(), BigQuery.TableDataListOption.startIndex(0L)); + Iterator iter = result.getValues().iterator(); + FieldValueList currentRow = iter.next(); + assertEquals("aaa", currentRow.get(0).getStringValue()); + FieldValueList currentRecord = currentRow.get(1).getRepeatedValue().get(0).getRecordValue(); + assertEquals("nested-str1", currentRecord.get(0).getStringValue()); + assertEquals("10", currentRecord.get(1).getStringValue()); + assertEquals("bbb", iter.next().get(0).getStringValue()); + assertEquals("ccc", iter.next().get(0).getStringValue()); + FieldValueList lastRow = iter.next(); + assertEquals("ddd", lastRow.get(0).getStringValue()); + FieldValueList lastRecord = lastRow.get(1).getRepeatedValue().get(0).getRecordValue(); + assertEquals("nested-str2", lastRecord.get(0).getStringValue()); + assertEquals("20", lastRecord.get(1).getStringValue()); + assertEquals(false, iter.hasNext()); + } + } + + @Test + public void testJsonStreamWriterSchemaUpdateWithFlexibleColumnName() + throws DescriptorValidationException, IOException, InterruptedException, ExecutionException { + String tableName = "SchemaUpdateFlexColumnTestTable"; + TableId tableId = TableId.of(DATASET, tableName); + Field col1 = Field.newBuilder("col1-列", StandardSQLTypeName.STRING).build(); + Schema originalSchema = Schema.of(col1); + TableInfo tableInfo = + TableInfo.newBuilder(tableId, StandardTableDefinition.of(originalSchema)).build(); + bigquery.create(tableInfo); + TableName parent = TableName.of(ServiceOptions.getDefaultProjectId(), DATASET, tableName); + WriteStream writeStream = + client.createWriteStream( + CreateWriteStreamRequest.newBuilder() + .setParent(parent.toString()) + .setWriteStream( + WriteStream.newBuilder().setType(WriteStream.Type.COMMITTED).build()) + .build()); + try (JsonStreamWriter jsonStreamWriter = + JsonStreamWriter.newBuilder(writeStream.getName(), client).build()) { + // write the 1st row + JSONObject foo = new JSONObject(); + foo.put("col1-列", "aaa"); + JSONArray jsonArr = new JSONArray(); + jsonArr.put(foo); + ApiFuture response = jsonStreamWriter.append(jsonArr, 0); + assertEquals(0, response.get().getAppendResult().getOffset().getValue()); + + // update schema with a new column + Field col2 = Field.newBuilder("col2-列", StandardSQLTypeName.STRING).build(); + Schema updatedSchema = Schema.of(ImmutableList.of(col1, col2)); + TableInfo updatedTableInfo = + TableInfo.newBuilder(tableId, StandardTableDefinition.of(updatedSchema)).build(); + Table updatedTable = bigquery.update(updatedTableInfo); + assertEquals(updatedSchema, updatedTable.getDefinition().getSchema()); + + // continue writing rows until backend acknowledges schema update + JSONObject foo2 = new JSONObject(); + foo2.put("col1-列", "bbb"); + JSONArray jsonArr2 = new JSONArray(); + jsonArr2.put(foo2); + + int next = 0; + for (int i = 1; i < 100; i++) { + ApiFuture response2 = jsonStreamWriter.append(jsonArr2, i); + assertEquals(i, response2.get().getAppendResult().getOffset().getValue()); + if (response2.get().hasUpdatedSchema()) { + next = i; + break; + } else { + Thread.sleep(1000); + } + } + + // write rows with updated schema. + JSONObject updatedFoo = new JSONObject(); + updatedFoo.put("col1-列", "ccc"); + updatedFoo.put("col2-列", "ddd"); + JSONArray updatedJsonArr = new JSONArray(); + updatedJsonArr.put(updatedFoo); + for (int i = 0; i < 10; i++) { + ApiFuture response3 = + jsonStreamWriter.append(updatedJsonArr, next + 1 + i); + assertEquals(next + 1 + i, response3.get().getAppendResult().getOffset().getValue()); + } + + // verify table data correctness + Iterator rowsIter = bigquery.listTableData(tableId).getValues().iterator(); + // 1 row of aaa + assertEquals("aaa", rowsIter.next().get(0).getStringValue()); + // a few rows of bbb + for (int j = 1; j <= next; j++) { + assertEquals("bbb", rowsIter.next().get(0).getStringValue()); + } + // 10 rows of ccc, ddd + for (int j = next + 1; j < next + 1 + 10; j++) { + FieldValueList temp = rowsIter.next(); + assertEquals("ccc", temp.get(0).getStringValue()); + assertEquals("ddd", temp.get(1).getStringValue()); + } + assertFalse(rowsIter.hasNext()); + } + } + @Test public void testComplicateSchemaWithPendingStream() throws IOException, InterruptedException, ExecutionException {