Skip to content

Commit

Permalink
feat: add ignore_nulls option to struct.json_encode
Browse files Browse the repository at this point in the history
  • Loading branch information
nikhilramolla committed Jul 26, 2024
1 parent 3016c07 commit 045b7f6
Show file tree
Hide file tree
Showing 8 changed files with 66 additions and 23 deletions.
2 changes: 1 addition & 1 deletion crates/polars-json/src/json/write/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ impl<'a> RecordSerializer<'a> {
let iterators = chunk
.arrays()
.iter()
.map(|arr| new_serializer(arr.as_ref(), 0, usize::MAX))
.map(|arr| new_serializer(arr.as_ref(), false, 0, usize::MAX))
.collect();

Self {
Expand Down
24 changes: 17 additions & 7 deletions crates/polars-json/src/json/write/serialize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,7 @@ fn utf8view_serializer<'a>(

fn struct_serializer<'a>(
array: &'a StructArray,
ignore_nulls: bool,
offset: usize,
take: usize,
) -> Box<dyn StreamingIterator<Item = [u8]> + 'a + Send + Sync> {
Expand All @@ -195,7 +196,7 @@ fn struct_serializer<'a>(
.values()
.iter()
.map(|x| x.as_ref())
.map(|arr| new_serializer(arr, offset, take))
.map(|arr| new_serializer(arr, ignore_nulls, offset, take))
.collect::<Vec<_>>();

Box::new(BufStreamingIterator::new(
Expand All @@ -211,6 +212,7 @@ fn struct_serializer<'a>(
.map(|serializer| serializer.next().unwrap()),
),
true,
ignore_nulls,
);
} else {
serializers.iter_mut().for_each(|iter| {
Expand All @@ -237,7 +239,7 @@ fn list_serializer<'a, O: Offset>(
let offsets = array.offsets().as_slice();
let start = offsets[0].to_usize();
let end = offsets.last().unwrap().to_usize();
let mut serializer = new_serializer(array.values().as_ref(), start, end - start);
let mut serializer = new_serializer(array.values().as_ref(), false, start, end - start);

let f = move |offset: Option<&[O]>, buf: &mut Vec<u8>| {
if let Some(offset) = offset {
Expand Down Expand Up @@ -267,7 +269,7 @@ fn fixed_size_list_serializer<'a>(
offset: usize,
take: usize,
) -> Box<dyn StreamingIterator<Item = [u8]> + 'a + Send + Sync> {
let mut serializer = new_serializer(array.values().as_ref(), offset, take);
let mut serializer = new_serializer(array.values().as_ref(), false, offset, take);

Box::new(BufStreamingIterator::new(
ZipValidity::new(0..array.len(), array.validity().map(|x| x.iter())),
Expand Down Expand Up @@ -403,6 +405,7 @@ fn timestamp_tz_serializer<'a>(

pub(crate) fn new_serializer<'a>(
array: &'a dyn Array,
ignore_nulls: bool,
offset: usize,
take: usize,
) -> Box<dyn StreamingIterator<Item = [u8]> + 'a + Send + Sync> {
Expand Down Expand Up @@ -450,9 +453,12 @@ pub(crate) fn new_serializer<'a>(
ArrowDataType::Utf8View => {
utf8view_serializer(array.as_any().downcast_ref().unwrap(), offset, take)
},
ArrowDataType::Struct(_) => {
struct_serializer(array.as_any().downcast_ref().unwrap(), offset, take)
},
ArrowDataType::Struct(_) => struct_serializer(
array.as_any().downcast_ref().unwrap(),
ignore_nulls,
offset,
take,
),
ArrowDataType::FixedSizeList(_, _) => {
fixed_size_list_serializer(array.as_any().downcast_ref().unwrap(), offset, take)
},
Expand Down Expand Up @@ -522,13 +528,17 @@ fn serialize_item<'a>(
buffer: &mut Vec<u8>,
record: impl Iterator<Item = (&'a str, &'a [u8])>,
is_first_row: bool,
ignore_nulls: bool,
) {
if !is_first_row {
buffer.push(b',');
}
buffer.push(b'{');
let mut first_item = true;
for (key, value) in record {
if ignore_nulls && value == b"null" {
continue;
}
if !first_item {
buffer.push(b',');
}
Expand All @@ -544,7 +554,7 @@ fn serialize_item<'a>(
/// # Implementation
/// This operation is CPU-bounded
pub(crate) fn serialize(array: &dyn Array, buffer: &mut Vec<u8>) {
let mut serializer = new_serializer(array, 0, usize::MAX);
let mut serializer = new_serializer(array, false, 0, usize::MAX);

(0..array.len()).for_each(|i| {
if i != 0 {
Expand Down
4 changes: 2 additions & 2 deletions crates/polars-json/src/json/write/utf8.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,9 +141,9 @@ where
writer.write_all(s)
}

pub fn serialize_to_utf8(array: &dyn Array) -> Utf8ViewArray {
pub fn serialize_to_utf8(array: &dyn Array, ignore_nulls: bool) -> Utf8ViewArray {
let mut values = MutableBinaryViewArray::with_capacity(array.len());
let mut serializer = new_serializer(array, 0, usize::MAX);
let mut serializer = new_serializer(array, ignore_nulls, 0, usize::MAX);

while let Some(v) = serializer.next() {
unsafe { values.push_value(std::str::from_utf8_unchecked(v)) }
Expand Down
2 changes: 1 addition & 1 deletion crates/polars-json/src/ndjson/write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use polars_error::{PolarsError, PolarsResult};
use super::super::json::write::new_serializer;

fn serialize(array: &dyn Array, buffer: &mut Vec<u8>) {
let mut serializer = new_serializer(array, 0, usize::MAX);
let mut serializer = new_serializer(array, false, 0, usize::MAX);
(0..array.len()).for_each(|_| {
buffer.extend_from_slice(serializer.next().unwrap());
buffer.push(b'\n');
Expand Down
14 changes: 8 additions & 6 deletions crates/polars-plan/src/dsl/function_expr/struct_.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,9 @@ pub enum StructFunction {
PrefixFields(Arc<str>),
SuffixFields(Arc<str>),
#[cfg(feature = "json")]
JsonEncode,
JsonEncode {
ignore_nulls: bool,
},
WithFields,
MultipleFields(Arc<[ColumnName]>),
}
Expand Down Expand Up @@ -91,7 +93,7 @@ impl StructFunction {
_ => polars_bail!(op = "suffix_fields", got = dt, expected = "Struct"),
}),
#[cfg(feature = "json")]
JsonEncode => mapper.with_dtype(DataType::String),
JsonEncode { .. } => mapper.with_dtype(DataType::String),
WithFields => {
let args = mapper.args();
let struct_ = &args[0];
Expand Down Expand Up @@ -134,7 +136,7 @@ impl Display for StructFunction {
PrefixFields(_) => write!(f, "name.prefix_fields"),
SuffixFields(_) => write!(f, "name.suffixFields"),
#[cfg(feature = "json")]
JsonEncode => write!(f, "struct.to_json"),
JsonEncode { .. } => write!(f, "struct.to_json"),
WithFields => write!(f, "with_fields"),
MultipleFields(_) => write!(f, "multiple_fields"),
}
Expand All @@ -151,7 +153,7 @@ impl From<StructFunction> for SpecialEq<Arc<dyn SeriesUdf>> {
PrefixFields(prefix) => map!(prefix_fields, prefix.clone()),
SuffixFields(suffix) => map!(suffix_fields, suffix.clone()),
#[cfg(feature = "json")]
JsonEncode => map!(to_json),
JsonEncode { ignore_nulls } => map!(to_json, ignore_nulls),
WithFields => map_as_slice!(with_fields),
MultipleFields(_) => unimplemented!(),
}
Expand Down Expand Up @@ -215,13 +217,13 @@ pub(super) fn suffix_fields(s: &Series, suffix: Arc<str>) -> PolarsResult<Series
}

#[cfg(feature = "json")]
pub(super) fn to_json(s: &Series) -> PolarsResult<Series> {
pub(super) fn to_json(s: &Series, ignore_nulls: bool) -> PolarsResult<Series> {
let ca = s.struct_()?;
let dtype = ca.dtype().to_arrow(CompatLevel::newest());

let iter = ca.chunks().iter().map(|arr| {
let arr = arrow::compute::cast::cast_unchecked(arr.as_ref(), &dtype).unwrap();
polars_json::json::write::serialize_to_utf8(arr.as_ref())
polars_json::json::write::serialize_to_utf8(arr.as_ref(), ignore_nulls)
});

Ok(StringChunked::from_chunk_iter(ca.name(), iter).into_series())
Expand Down
6 changes: 4 additions & 2 deletions crates/polars-plan/src/dsl/struct_.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,11 @@ impl StructNameSpace {
}

#[cfg(feature = "json")]
pub fn json_encode(self) -> Expr {
pub fn json_encode(self, ignore_nulls: bool) -> Expr {
self.0
.map_private(FunctionExpr::StructExpr(StructFunction::JsonEncode))
.map_private(FunctionExpr::StructExpr(StructFunction::JsonEncode {
ignore_nulls,
}))
}

pub fn with_fields(self, fields: Vec<Expr>) -> PolarsResult<Expr> {
Expand Down
29 changes: 27 additions & 2 deletions py-polars/polars/expr/struct.py
Original file line number Diff line number Diff line change
Expand Up @@ -214,10 +214,21 @@ def rename_fields(self, names: Sequence[str]) -> Expr:
"""
return wrap_expr(self._pyexpr.struct_rename_fields(names))

def json_encode(self) -> Expr:
def json_encode(self, *, ignore_nulls: bool = False) -> Expr:
"""
Convert this struct to a string column with json values.
Parameters
----------
ignore_nulls
Ignore missing values in the struct when serializing.
- When `ignore_nulls=False`, the values in the struct are included even
if they are null
- When `ignore_nulls=True`, the values in the struct are skipped if they
are null
Examples
--------
>>> pl.DataFrame(
Expand All @@ -232,8 +243,22 @@ def json_encode(self) -> Expr:
│ {[1, 2],[45]} ┆ {"a":[1,2],"b":[45]} │
│ {[9, 1, 3],null} ┆ {"a":[9,1,3],"b":null} │
└──────────────────┴────────────────────────┘
>>> pl.DataFrame(
... {"a": [{"a": [1, 2], "b": [45]}, {"a": [9, 1, 3], "b": None}]}
... ).with_columns(
... pl.col("a").struct.json_encode(ignore_nulls=True).alias("encoded")
... )
shape: (2, 2)
┌──────────────────┬────────────────────────┐
│ a ┆ encoded │
│ --- ┆ --- │
│ struct[2] ┆ str │
╞══════════════════╪════════════════════════╡
│ {[1, 2],[45]} ┆ {"a":[1,2],"b":[45]} │
│ {[9, 1, 3],null} ┆ {"a":[9,1,3]} │
└──────────────────┴────────────────────────┘
"""
return wrap_expr(self._pyexpr.struct_json_encode())
return wrap_expr(self._pyexpr.struct_json_encode(ignore_nulls))

def with_fields(
self,
Expand Down
8 changes: 6 additions & 2 deletions py-polars/src/expr/struct.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,12 @@ impl PyExpr {
}

#[cfg(feature = "json")]
fn struct_json_encode(&self) -> Self {
self.inner.clone().struct_().json_encode().into()
fn struct_json_encode(&self, ignore_nulls: bool) -> Self {
self.inner
.clone()
.struct_()
.json_encode(ignore_nulls)
.into()
}

fn struct_with_fields(&self, fields: Vec<PyExpr>) -> PyResult<Self> {
Expand Down

0 comments on commit 045b7f6

Please sign in to comment.