From e6e40311f7b0b4acaa62d0d1fa9ee516f426710c Mon Sep 17 00:00:00 2001 From: James Gill Date: Tue, 15 Oct 2024 11:09:18 -0400 Subject: [PATCH] Add support for de/serializing list-encoded JSON structs [#6558] Currently, a StructArray can only be deserialized from or serialized to a JSON object (e.g. `{a: 1, b: "c"}`), but some services (e.g. Presto and Trino) encode ROW types as JSON lists (e.g. `[1, "c"]`) because this is more compact, and the schema is known. This PR adds the ability to encode and decode JSON lists from and to StructArrays, if StructMode is set to ListOnly. In ListOnly mode, object-encoded structs raise an error. Setting to ObjectOnly (the default) has the original parsing behavior. Some notes/questions/points for discussion: 1. I've made a JsonParseMode struct instead of a bool flag for two reasons. One is that it's self-descriptive (what would `true` be?), and the other is that it allows a future Mixed mode that could deserialize either. The latter isn't currently requested by anyone. 2. I kept the error messages as similar to the old messages as possible. I considered having more specific error messages (like "Encountered a '[' when parsing a Struct, but the StructParseMode is ObjectOnly" or similar), but wanted to hear opinions before I went that route. 3. I'm not attached to any name/code-style/etc, so happy to modify to fit local conventions. Fixes #6558 --- arrow-json/src/lib.rs | 96 +++++++++ arrow-json/src/reader/list_array.rs | 3 + arrow-json/src/reader/map_array.rs | 4 + arrow-json/src/reader/mod.rs | 282 +++++++++++++++++++++++++- arrow-json/src/reader/struct_array.rs | 109 +++++++--- arrow-json/src/writer/encoder.rs | 24 ++- arrow-json/src/writer/mod.rs | 85 +++++++- 7 files changed, 557 insertions(+), 46 deletions(-) diff --git a/arrow-json/src/lib.rs b/arrow-json/src/lib.rs index b6c441012b2a..01d9fb8e7ded 100644 --- a/arrow-json/src/lib.rs +++ b/arrow-json/src/lib.rs @@ -74,6 +74,34 @@ pub use self::writer::{ArrayWriter, LineDelimitedWriter, Writer, WriterBuilder}; use half::f16; use serde_json::{Number, Value}; +/// Specifies what is considered valid JSON when reading or writing +/// RecordBatches or StructArrays. +/// +/// This enum controls which form(s) the Reader will accept and which form the +/// Writer will produce. For example, if the RecordBatch Schema is +/// `[("a", Int32), ("r", Struct([("b", Boolean), ("c", Utf8)]))]` +/// then a Reader with [`StructMode::ObjectOnly`] would read rows of the form +/// `{"a": 1, "r": {"b": true, "c": "cat"}}` while with ['StructMode::ListOnly'] +/// would read rows of the form `[1, [true, "cat"]]`. A Writer would produce +/// rows formatted similarly. +/// +/// The list encoding is more compact if the schema is known, and is used by +/// tools such as +/// [Presto](https://prestodb.io/docs/current/develop/client-protocol.html#important-queryresults-attributes) +/// and [Trino](https://trino.io/docs/current/develop/client-protocol.html#important-queryresults-attributes). +/// +/// When reading objects, the order of the key does not matter. When reading +/// lists, the entries must be the same number and in the same order as the +/// struct fields. Map columns are not affected by this option. +#[derive(Copy, Clone, Debug, Default, PartialEq, Eq)] +pub enum StructMode { + #[default] + /// Encode/decode structs as objects (e.g., {"a": 1, "b": "c"}) + ObjectOnly, + /// Encode/decode structs as lists (e.g., [1, "c"]) + ListOnly, +} + /// Trait declaring any type that is serializable to JSON. This includes all primitive types (bool, i32, etc.). pub trait JsonSerializable: 'static { /// Converts self into json value if its possible @@ -156,4 +184,72 @@ mod tests { ); assert_eq!(None, f32::NAN.into_json_value()); } + + #[test] + fn test_json_roundtrip_structs() { + use crate::writer::LineDelimited; + use arrow_schema::DataType; + use arrow_schema::Field; + use arrow_schema::Fields; + use arrow_schema::Schema; + use std::sync::Arc; + + let schema = Arc::new(Schema::new(vec![ + Field::new( + "c1", + DataType::Struct(Fields::from(vec![ + Field::new("c11", DataType::Int32, true), + Field::new( + "c12", + DataType::Struct(vec![Field::new("c121", DataType::Utf8, false)].into()), + false, + ), + ])), + false, + ), + Field::new("c2", DataType::Utf8, false), + ])); + + { + let object_input = r#"{"c1":{"c11":1,"c12":{"c121":"e"}},"c2":"a"} +{"c1":{"c12":{"c121":"f"}},"c2":"b"} +{"c1":{"c11":5,"c12":{"c121":"g"}},"c2":"c"} +"# + .as_bytes(); + let object_reader = ReaderBuilder::new(schema.clone()) + .with_struct_mode(StructMode::ObjectOnly) + .build(object_input) + .unwrap(); + + let mut object_output: Vec = Vec::new(); + let mut object_writer = WriterBuilder::new() + .with_struct_mode(StructMode::ObjectOnly) + .build::<_, LineDelimited>(&mut object_output); + for batch_res in object_reader { + object_writer.write(&batch_res.unwrap()).unwrap(); + } + assert_eq!(object_input, &object_output); + } + + { + let list_input = r#"[[1,["e"]],"a"] +[[null,["f"]],"b"] +[[5,["g"]],"c"] +"# + .as_bytes(); + let list_reader = ReaderBuilder::new(schema.clone()) + .with_struct_mode(StructMode::ListOnly) + .build(list_input) + .unwrap(); + + let mut list_output: Vec = Vec::new(); + let mut list_writer = WriterBuilder::new() + .with_struct_mode(StructMode::ListOnly) + .build::<_, LineDelimited>(&mut list_output); + for batch_res in list_reader { + list_writer.write(&batch_res.unwrap()).unwrap(); + } + assert_eq!(list_input, &list_output); + } + } } diff --git a/arrow-json/src/reader/list_array.rs b/arrow-json/src/reader/list_array.rs index b6f8c18ea9c3..1a1dee6a23d4 100644 --- a/arrow-json/src/reader/list_array.rs +++ b/arrow-json/src/reader/list_array.rs @@ -17,6 +17,7 @@ use crate::reader::tape::{Tape, TapeElement}; use crate::reader::{make_decoder, ArrayDecoder}; +use crate::StructMode; use arrow_array::builder::{BooleanBufferBuilder, BufferBuilder}; use arrow_array::OffsetSizeTrait; use arrow_buffer::buffer::NullBuffer; @@ -37,6 +38,7 @@ impl ListArrayDecoder { coerce_primitive: bool, strict_mode: bool, is_nullable: bool, + struct_mode: StructMode, ) -> Result { let field = match &data_type { DataType::List(f) if !O::IS_LARGE => f, @@ -48,6 +50,7 @@ impl ListArrayDecoder { coerce_primitive, strict_mode, field.is_nullable(), + struct_mode, )?; Ok(Self { diff --git a/arrow-json/src/reader/map_array.rs b/arrow-json/src/reader/map_array.rs index cd1ca5f71fa9..ee78373a551e 100644 --- a/arrow-json/src/reader/map_array.rs +++ b/arrow-json/src/reader/map_array.rs @@ -17,6 +17,7 @@ use crate::reader::tape::{Tape, TapeElement}; use crate::reader::{make_decoder, ArrayDecoder}; +use crate::StructMode; use arrow_array::builder::{BooleanBufferBuilder, BufferBuilder}; use arrow_buffer::buffer::NullBuffer; use arrow_buffer::ArrowNativeType; @@ -36,6 +37,7 @@ impl MapArrayDecoder { coerce_primitive: bool, strict_mode: bool, is_nullable: bool, + struct_mode: StructMode, ) -> Result { let fields = match &data_type { DataType::Map(_, true) => { @@ -59,12 +61,14 @@ impl MapArrayDecoder { coerce_primitive, strict_mode, fields[0].is_nullable(), + struct_mode, )?; let values = make_decoder( fields[1].data_type().clone(), coerce_primitive, strict_mode, fields[1].is_nullable(), + struct_mode, )?; Ok(Self { diff --git a/arrow-json/src/reader/mod.rs b/arrow-json/src/reader/mod.rs index f857e8813c7e..d8fa54de6042 100644 --- a/arrow-json/src/reader/mod.rs +++ b/arrow-json/src/reader/mod.rs @@ -133,6 +133,7 @@ //! ``` //! +use crate::StructMode; use std::io::BufRead; use std::sync::Arc; @@ -176,6 +177,7 @@ pub struct ReaderBuilder { coerce_primitive: bool, strict_mode: bool, is_field: bool, + struct_mode: StructMode, schema: SchemaRef, } @@ -195,6 +197,7 @@ impl ReaderBuilder { coerce_primitive: false, strict_mode: false, is_field: false, + struct_mode: Default::default(), schema, } } @@ -235,6 +238,7 @@ impl ReaderBuilder { coerce_primitive: false, strict_mode: false, is_field: true, + struct_mode: Default::default(), schema: Arc::new(Schema::new([field.into()])), } } @@ -253,8 +257,11 @@ impl ReaderBuilder { } } - /// Sets if the decoder should return an error if it encounters a column not present - /// in `schema` + /// Sets if the decoder should return an error if it encounters a column not + /// present in `schema`. If `struct_mode` is `ListOnly`, then it is required + /// for all fields of the struct to be in the list, regardless of the value + /// of `strict_mode`. Without field names, there is no way to determine + /// which field is missing. pub fn with_strict_mode(self, strict_mode: bool) -> Self { Self { strict_mode, @@ -262,6 +269,16 @@ impl ReaderBuilder { } } + /// Set the [`StructMode`] for the reader, which determines whether structs + /// can be decoded from JSON as objects or lists. For more details refer to + /// the enum documentation. Default is to use `ObjectOnly`. + pub fn with_struct_mode(self, struct_mode: StructMode) -> Self { + Self { + struct_mode, + ..self + } + } + /// Create a [`Reader`] with the provided [`BufRead`] pub fn build(self, reader: R) -> Result, ArrowError> { Ok(Reader { @@ -280,7 +297,13 @@ impl ReaderBuilder { } }; - let decoder = make_decoder(data_type, self.coerce_primitive, self.strict_mode, nullable)?; + let decoder = make_decoder( + data_type, + self.coerce_primitive, + self.strict_mode, + nullable, + self.struct_mode, + )?; let num_fields = self.schema.flattened_fields().len(); @@ -643,6 +666,7 @@ fn make_decoder( coerce_primitive: bool, strict_mode: bool, is_nullable: bool, + struct_mode: StructMode, ) -> Result, ArrowError> { downcast_integer! { data_type => (primitive_decoder, data_type), @@ -693,13 +717,13 @@ fn make_decoder( DataType::Boolean => Ok(Box::::default()), DataType::Utf8 => Ok(Box::new(StringArrayDecoder::::new(coerce_primitive))), DataType::LargeUtf8 => Ok(Box::new(StringArrayDecoder::::new(coerce_primitive))), - DataType::List(_) => Ok(Box::new(ListArrayDecoder::::new(data_type, coerce_primitive, strict_mode, is_nullable)?)), - DataType::LargeList(_) => Ok(Box::new(ListArrayDecoder::::new(data_type, coerce_primitive, strict_mode, is_nullable)?)), - DataType::Struct(_) => Ok(Box::new(StructArrayDecoder::new(data_type, coerce_primitive, strict_mode, is_nullable)?)), + DataType::List(_) => Ok(Box::new(ListArrayDecoder::::new(data_type, coerce_primitive, strict_mode, is_nullable, struct_mode)?)), + DataType::LargeList(_) => Ok(Box::new(ListArrayDecoder::::new(data_type, coerce_primitive, strict_mode, is_nullable, struct_mode)?)), + DataType::Struct(_) => Ok(Box::new(StructArrayDecoder::new(data_type, coerce_primitive, strict_mode, is_nullable, struct_mode)?)), DataType::Binary | DataType::LargeBinary | DataType::FixedSizeBinary(_) => { Err(ArrowError::JsonError(format!("{data_type} is not supported by JSON"))) } - DataType::Map(_, _) => Ok(Box::new(MapArrayDecoder::new(data_type, coerce_primitive, strict_mode, is_nullable)?)), + DataType::Map(_, _) => Ok(Box::new(MapArrayDecoder::new(data_type, coerce_primitive, strict_mode, is_nullable, struct_mode)?)), d => Err(ArrowError::NotYetImplemented(format!("Support for {d} in JSON reader"))) } } @@ -715,7 +739,7 @@ mod tests { use arrow_buffer::{ArrowNativeType, Buffer}; use arrow_cast::display::{ArrayFormatter, FormatOptions}; use arrow_data::ArrayDataBuilder; - use arrow_schema::Field; + use arrow_schema::{Field, Fields}; use super::*; @@ -2343,4 +2367,246 @@ mod tests { .unwrap() ); } + + #[test] + fn test_struct_decoding_list_length() { + use arrow_array::array; + + let row = "[1, 2]"; + + let mut fields = vec![Field::new("a", DataType::Int32, true)]; + let too_few_fields = Fields::from(fields.clone()); + fields.push(Field::new("b", DataType::Int32, true)); + let correct_fields = Fields::from(fields.clone()); + fields.push(Field::new("c", DataType::Int32, true)); + let too_many_fields = Fields::from(fields.clone()); + + let parse = |fields: Fields, as_field: bool| { + let builder = if as_field { + ReaderBuilder::new_with_field(Field::new("r", DataType::Struct(fields), true)) + } else { + ReaderBuilder::new(Arc::new(Schema::new(fields))) + }; + builder + .with_struct_mode(StructMode::ListOnly) + .build(Cursor::new(row.as_bytes())) + .unwrap() + .next() + .unwrap() + }; + + let expected_row = StructArray::new( + correct_fields.clone(), + vec![ + Arc::new(array::Int32Array::from(vec![1])), + Arc::new(array::Int32Array::from(vec![2])), + ], + None, + ); + let row_field = Field::new("r", DataType::Struct(correct_fields.clone()), true); + + assert_eq!( + parse(too_few_fields.clone(), true).unwrap_err().to_string(), + "Json error: found extra columns for 1 fields".to_string() + ); + assert_eq!( + parse(too_few_fields, false).unwrap_err().to_string(), + "Json error: found extra columns for 1 fields".to_string() + ); + assert_eq!( + parse(correct_fields.clone(), true).unwrap(), + RecordBatch::try_new( + Arc::new(Schema::new(vec![row_field])), + vec![Arc::new(expected_row.clone())] + ) + .unwrap() + ); + assert_eq!( + parse(correct_fields, false).unwrap(), + RecordBatch::from(expected_row) + ); + assert_eq!( + parse(too_many_fields.clone(), true) + .unwrap_err() + .to_string(), + "Json error: found 2 columns for 3 fields".to_string() + ); + assert_eq!( + parse(too_many_fields, false).unwrap_err().to_string(), + "Json error: found 2 columns for 3 fields".to_string() + ); + } + + #[test] + fn test_struct_decoding() { + use arrow_array::builder; + + let nested_object_json = r#"{"a": {"b": [1, 2], "c": {"d": 3}}}"#; + let nested_list_json = r#"[[[1, 2], {"d": 3}]]"#; + let nested_mixed_json = r#"{"a": [[1, 2], {"d": 3}]}"#; + + let struct_fields = Fields::from(vec![ + Field::new("b", DataType::new_list(DataType::Int32, true), true), + Field::new_map( + "c", + "entries", + Field::new("keys", DataType::Utf8, false), + Field::new("values", DataType::Int32, true), + false, + false, + ), + ]); + + let list_array = + ListArray::from_iter_primitive::(vec![Some(vec![Some(1), Some(2)])]); + + let map_array = { + let mut map_builder = builder::MapBuilder::new( + None, + builder::StringBuilder::new(), + builder::Int32Builder::new(), + ); + map_builder.keys().append_value("d"); + map_builder.values().append_value(3); + map_builder.append(true).unwrap(); + map_builder.finish() + }; + + let struct_array = StructArray::new( + struct_fields.clone(), + vec![Arc::new(list_array), Arc::new(map_array)], + None, + ); + + let schema = Arc::new(Schema::new(vec![Field::new( + "a", + DataType::Struct(struct_fields), + true, + )])); + let expected = RecordBatch::try_new(schema.clone(), vec![Arc::new(struct_array)]).unwrap(); + + let parse = |s: &str, mode: StructMode| { + ReaderBuilder::new(schema.clone()) + .with_struct_mode(mode) + .build(Cursor::new(s.as_bytes())) + .unwrap() + .next() + .unwrap() + }; + + assert_eq!( + parse(nested_object_json, StructMode::ObjectOnly).unwrap(), + expected + ); + assert_eq!( + parse(nested_list_json, StructMode::ObjectOnly) + .unwrap_err() + .to_string(), + "Json error: expected { got [[[1, 2], {\"d\": 3}]]".to_owned() + ); + assert_eq!( + parse(nested_mixed_json, StructMode::ObjectOnly) + .unwrap_err() + .to_string(), + "Json error: whilst decoding field 'a': expected { got [[1, 2], {\"d\": 3}]".to_owned() + ); + + assert_eq!( + parse(nested_list_json, StructMode::ListOnly).unwrap(), + expected + ); + assert_eq!( + parse(nested_object_json, StructMode::ListOnly) + .unwrap_err() + .to_string(), + "Json error: expected [ got {\"a\": {\"b\": [1, 2]\"c\": {\"d\": 3}}}".to_owned() + ); + assert_eq!( + parse(nested_mixed_json, StructMode::ListOnly) + .unwrap_err() + .to_string(), + "Json error: expected [ got {\"a\": [[1, 2], {\"d\": 3}]}".to_owned() + ); + } + + // Test cases: + // [] -> RecordBatch row with no entries. Schema = [('a', Int32)] -> Error + // [] -> RecordBatch row with no entries. Schema = [('r', [('a', Int32)])] -> Error + // [] -> StructArray row with no entries. Fields [('a', Int32')] -> Error + // [[]] -> RecordBatch row with empty struct entry. Schema = [('r', [('a', Int32)])] -> Error + #[test] + fn test_struct_decoding_empty_list() { + let int_field = Field::new("a", DataType::Int32, true); + let struct_field = Field::new( + "r", + DataType::Struct(Fields::from(vec![int_field.clone()])), + true, + ); + + let parse = |json: &str, as_field: bool, field: Field| { + let builder = if as_field { + ReaderBuilder::new_with_field(field.clone()) + } else { + ReaderBuilder::new(Arc::new(Schema::new(vec![field].clone()))) + }; + builder + .with_struct_mode(StructMode::ListOnly) + .build(Cursor::new(json.as_bytes())) + .unwrap() + .next() + .unwrap() + }; + + // Missing fields + assert_eq!( + parse("[]", true, struct_field.clone()) + .unwrap_err() + .to_string(), + "Json error: found 0 columns for 1 fields".to_owned() + ); + assert_eq!( + parse("[]", false, int_field.clone()) + .unwrap_err() + .to_string(), + "Json error: found 0 columns for 1 fields".to_owned() + ); + assert_eq!( + parse("[]", false, struct_field.clone()) + .unwrap_err() + .to_string(), + "Json error: found 0 columns for 1 fields".to_owned() + ); + assert_eq!( + parse("[[]]", false, struct_field.clone()) + .unwrap_err() + .to_string(), + "Json error: whilst decoding field 'r': found 0 columns for 1 fields".to_owned() + ); + + // Wrong values + assert_eq!( + parse(r#"["a"]"#, true, struct_field.clone()) + .unwrap_err() + .to_string(), + "Json error: whilst decoding field 'a': failed to parse \"a\" as Int32".to_owned() + ); + assert_eq!( + parse(r#"["a"]"#, false, int_field.clone()) + .unwrap_err() + .to_string(), + "Json error: whilst decoding field 'a': failed to parse \"a\" as Int32".to_owned() + ); + assert_eq!( + parse(r#"[["a"]]"#, false, struct_field.clone()) + .unwrap_err() + .to_string(), + "Json error: whilst decoding field 'r': whilst decoding field 'a': failed to parse \"a\" as Int32".to_owned() + ); + assert_eq!( + parse(r#"["a"]"#, true, struct_field.clone()) + .unwrap_err() + .to_string(), + "Json error: whilst decoding field 'a': failed to parse \"a\" as Int32".to_owned() + ); + } } diff --git a/arrow-json/src/reader/struct_array.rs b/arrow-json/src/reader/struct_array.rs index 6c805591d390..c0c2e8c6cbd4 100644 --- a/arrow-json/src/reader/struct_array.rs +++ b/arrow-json/src/reader/struct_array.rs @@ -16,7 +16,7 @@ // under the License. use crate::reader::tape::{Tape, TapeElement}; -use crate::reader::{make_decoder, ArrayDecoder}; +use crate::reader::{make_decoder, ArrayDecoder, StructMode}; use arrow_array::builder::BooleanBufferBuilder; use arrow_buffer::buffer::NullBuffer; use arrow_data::{ArrayData, ArrayDataBuilder}; @@ -27,6 +27,7 @@ pub struct StructArrayDecoder { decoders: Vec>, strict_mode: bool, is_nullable: bool, + struct_mode: StructMode, } impl StructArrayDecoder { @@ -35,6 +36,7 @@ impl StructArrayDecoder { coerce_primitive: bool, strict_mode: bool, is_nullable: bool, + struct_mode: StructMode, ) -> Result { let decoders = struct_fields(&data_type) .iter() @@ -48,6 +50,7 @@ impl StructArrayDecoder { coerce_primitive, strict_mode, nullable, + struct_mode, ) }) .collect::, ArrowError>>()?; @@ -57,6 +60,7 @@ impl StructArrayDecoder { decoders, strict_mode, is_nullable, + struct_mode, }) } } @@ -70,43 +74,84 @@ impl ArrayDecoder for StructArrayDecoder { .is_nullable .then(|| BooleanBufferBuilder::new(pos.len())); - for (row, p) in pos.iter().enumerate() { - let end_idx = match (tape.get(*p), nulls.as_mut()) { - (TapeElement::StartObject(end_idx), None) => end_idx, - (TapeElement::StartObject(end_idx), Some(nulls)) => { - nulls.append(true); - end_idx - } - (TapeElement::Null, Some(nulls)) => { - nulls.append(false); - continue; + match self.struct_mode { + StructMode::ObjectOnly => { + for (row, p) in pos.iter().enumerate() { + let end_idx = match (tape.get(*p), nulls.as_mut()) { + (TapeElement::StartObject(end_idx), None) => end_idx, + (TapeElement::StartObject(end_idx), Some(nulls)) => { + nulls.append(true); + end_idx + } + (TapeElement::Null, Some(nulls)) => { + nulls.append(false); + continue; + } + (_, _) => return Err(tape.error(*p, "{")), + }; + + let mut cur_idx = *p + 1; + while cur_idx < end_idx { + // Read field name + let field_name = match tape.get(cur_idx) { + TapeElement::String(s) => tape.get_string(s), + _ => return Err(tape.error(cur_idx, "field name")), + }; + + // Update child pos if match found + match fields.iter().position(|x| x.name() == field_name) { + Some(field_idx) => child_pos[field_idx][row] = cur_idx + 1, + None => { + if self.strict_mode { + return Err(ArrowError::JsonError(format!( + "column '{}' missing from schema", + field_name + ))); + } + } + } + // Advance to next field + cur_idx = tape.next(cur_idx + 1, "field value")?; + } } - _ => return Err(tape.error(*p, "{")), - }; - - let mut cur_idx = *p + 1; - while cur_idx < end_idx { - // Read field name - let field_name = match tape.get(cur_idx) { - TapeElement::String(s) => tape.get_string(s), - _ => return Err(tape.error(cur_idx, "field name")), - }; - - // Update child pos if match found - match fields.iter().position(|x| x.name() == field_name) { - Some(field_idx) => child_pos[field_idx][row] = cur_idx + 1, - None => { - if self.strict_mode { + } + StructMode::ListOnly => { + for (row, p) in pos.iter().enumerate() { + let end_idx = match (tape.get(*p), nulls.as_mut()) { + (TapeElement::StartList(end_idx), None) => end_idx, + (TapeElement::StartList(end_idx), Some(nulls)) => { + nulls.append(true); + end_idx + } + (TapeElement::Null, Some(nulls)) => { + nulls.append(false); + continue; + } + (_, _) => return Err(tape.error(*p, "[")), + }; + + let mut cur_idx = *p + 1; + let mut entry_idx = 0; + while cur_idx < end_idx { + if entry_idx >= fields.len() { return Err(ArrowError::JsonError(format!( - "column '{}' missing from schema", - field_name + "found extra columns for {} fields", + fields.len() ))); } + child_pos[entry_idx][row] = cur_idx; + entry_idx += 1; + // Advance to next field + cur_idx = tape.next(cur_idx, "field value")?; + } + if entry_idx != fields.len() { + return Err(ArrowError::JsonError(format!( + "found {} columns for {} fields", + entry_idx, + fields.len() + ))); } } - - // Advance to next field - cur_idx = tape.next(cur_idx + 1, "field value")?; } } diff --git a/arrow-json/src/writer/encoder.rs b/arrow-json/src/writer/encoder.rs index ed430fe6a1ec..0b3c788d5519 100644 --- a/arrow-json/src/writer/encoder.rs +++ b/arrow-json/src/writer/encoder.rs @@ -15,6 +15,7 @@ // specific language governing permissions and limitations // under the License. +use crate::StructMode; use arrow_array::cast::AsArray; use arrow_array::types::*; use arrow_array::*; @@ -29,6 +30,7 @@ use std::io::Write; #[derive(Debug, Clone, Default)] pub struct EncoderOptions { pub explicit_nulls: bool, + pub struct_mode: StructMode, } /// A trait to format array values as JSON values @@ -135,6 +137,7 @@ fn make_encoder_impl<'a>( let encoder = StructArrayEncoder{ encoders, explicit_nulls: options.explicit_nulls, + struct_mode: options.struct_mode, }; (Box::new(encoder) as _, array.nulls().cloned()) } @@ -172,6 +175,7 @@ struct FieldEncoder<'a> { struct StructArrayEncoder<'a> { encoders: Vec>, explicit_nulls: bool, + struct_mode: StructMode, } /// This API is only stable since 1.70 so can't use it when current MSRV is lower @@ -185,11 +189,16 @@ fn is_some_and(opt: Option, f: impl FnOnce(T) -> bool) -> bool { impl Encoder for StructArrayEncoder<'_> { fn encode(&mut self, idx: usize, out: &mut Vec) { - out.push(b'{'); + match self.struct_mode { + StructMode::ObjectOnly => out.push(b'{'), + StructMode::ListOnly => out.push(b'['), + } let mut is_first = true; + // Nulls can only be dropped in explicit mode + let drop_nulls = (self.struct_mode == StructMode::ObjectOnly) && !self.explicit_nulls; for field_encoder in &mut self.encoders { let is_null = is_some_and(field_encoder.nulls.as_ref(), |n| n.is_null(idx)); - if is_null && !self.explicit_nulls { + if drop_nulls && is_null { continue; } @@ -198,15 +207,20 @@ impl Encoder for StructArrayEncoder<'_> { } is_first = false; - encode_string(field_encoder.field.name(), out); - out.push(b':'); + if self.struct_mode == StructMode::ObjectOnly { + encode_string(field_encoder.field.name(), out); + out.push(b':'); + } match is_null { true => out.extend_from_slice(b"null"), false => field_encoder.encoder.encode(idx, out), } } - out.push(b'}'); + match self.struct_mode { + StructMode::ObjectOnly => out.push(b'}'), + StructMode::ListOnly => out.push(b']'), + } } } diff --git a/arrow-json/src/writer/mod.rs b/arrow-json/src/writer/mod.rs index ee6d83a0a1f0..5d3e558480ca 100644 --- a/arrow-json/src/writer/mod.rs +++ b/arrow-json/src/writer/mod.rs @@ -108,6 +108,7 @@ mod encoder; use std::{fmt::Debug, io::Write}; +use crate::StructMode; use arrow_array::*; use arrow_schema::*; @@ -247,12 +248,28 @@ impl WriterBuilder { /// {"foo":null,"bar":null} /// ``` /// - /// Default is to skip nulls (set to `false`). + /// Default is to skip nulls (set to `false`). If `struct_mode == ListOnly`, + /// nulls will be written explicitly regardless of this setting. pub fn with_explicit_nulls(mut self, explicit_nulls: bool) -> Self { self.0.explicit_nulls = explicit_nulls; self } + /// Returns if this writer is configured to write structs as JSON Objects or Arrays. + pub fn struct_mode(&self) -> StructMode { + self.0.struct_mode + } + + /// Set the [`StructMode`] for the writer, which determines whether structs + /// are encoded to JSON as objects or lists. For more details refer to the + /// enum documentation. Default is to use `ObjectOnly`. If this is set to + /// `ListOnly`, nulls will be written explicitly regardless of the + /// `explicit_nulls` setting. + pub fn with_struct_mode(mut self, struct_mode: StructMode) -> Self { + self.0.struct_mode = struct_mode; + self + } + /// Create a new `Writer` with specified `JsonFormat` and builder options. pub fn build(self, writer: W) -> Writer where @@ -1953,4 +1970,70 @@ mod tests { "#, ); } + + #[test] + fn write_structs_as_list() { + let schema = Schema::new(vec![ + Field::new( + "c1", + DataType::Struct(Fields::from(vec![ + Field::new("c11", DataType::Int32, true), + Field::new( + "c12", + DataType::Struct(vec![Field::new("c121", DataType::Utf8, false)].into()), + false, + ), + ])), + false, + ), + Field::new("c2", DataType::Utf8, false), + ]); + + let c1 = StructArray::from(vec![ + ( + Arc::new(Field::new("c11", DataType::Int32, true)), + Arc::new(Int32Array::from(vec![Some(1), None, Some(5)])) as ArrayRef, + ), + ( + Arc::new(Field::new( + "c12", + DataType::Struct(vec![Field::new("c121", DataType::Utf8, false)].into()), + false, + )), + Arc::new(StructArray::from(vec![( + Arc::new(Field::new("c121", DataType::Utf8, false)), + Arc::new(StringArray::from(vec![Some("e"), Some("f"), Some("g")])) as ArrayRef, + )])) as ArrayRef, + ), + ]); + let c2 = StringArray::from(vec![Some("a"), Some("b"), Some("c")]); + + let batch = + RecordBatch::try_new(Arc::new(schema), vec![Arc::new(c1), Arc::new(c2)]).unwrap(); + + let expected = r#"[[1,["e"]],"a"] +[[null,["f"]],"b"] +[[5,["g"]],"c"] +"#; + + let mut buf = Vec::new(); + { + let builder = WriterBuilder::new() + .with_explicit_nulls(true) + .with_struct_mode(StructMode::ListOnly); + let mut writer = builder.build::<_, LineDelimited>(&mut buf); + writer.write_batches(&[&batch]).unwrap(); + } + assert_json_eq(&buf, expected); + + let mut buf = Vec::new(); + { + let builder = WriterBuilder::new() + .with_explicit_nulls(false) + .with_struct_mode(StructMode::ListOnly); + let mut writer = builder.build::<_, LineDelimited>(&mut buf); + writer.write_batches(&[&batch]).unwrap(); + } + assert_json_eq(&buf, expected); + } }