Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add ignore_nulls option to struct.json_encode #17867

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion crates/polars-json/src/json/write/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ impl<'a> RecordSerializer<'a> {
let iterators = chunk
.arrays()
.iter()
.map(|arr| new_serializer(arr.as_ref(), 0, usize::MAX))
.map(|arr| new_serializer(arr.as_ref(), false, 0, usize::MAX))
.collect();

Self {
Expand Down
24 changes: 17 additions & 7 deletions crates/polars-json/src/json/write/serialize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,7 @@ fn utf8view_serializer<'a>(

fn struct_serializer<'a>(
array: &'a StructArray,
ignore_nulls: bool,
offset: usize,
take: usize,
) -> Box<dyn StreamingIterator<Item = [u8]> + 'a + Send + Sync> {
Expand All @@ -195,7 +196,7 @@ fn struct_serializer<'a>(
.values()
.iter()
.map(|x| x.as_ref())
.map(|arr| new_serializer(arr, offset, take))
.map(|arr| new_serializer(arr, ignore_nulls, offset, take))
.collect::<Vec<_>>();

Box::new(BufStreamingIterator::new(
Expand All @@ -211,6 +212,7 @@ fn struct_serializer<'a>(
.map(|serializer| serializer.next().unwrap()),
),
true,
ignore_nulls,
);
} else {
serializers.iter_mut().for_each(|iter| {
Expand All @@ -237,7 +239,7 @@ fn list_serializer<'a, O: Offset>(
let offsets = array.offsets().as_slice();
let start = offsets[0].to_usize();
let end = offsets.last().unwrap().to_usize();
let mut serializer = new_serializer(array.values().as_ref(), start, end - start);
let mut serializer = new_serializer(array.values().as_ref(), false, start, end - start);

let f = move |offset: Option<&[O]>, buf: &mut Vec<u8>| {
if let Some(offset) = offset {
Expand Down Expand Up @@ -267,7 +269,7 @@ fn fixed_size_list_serializer<'a>(
offset: usize,
take: usize,
) -> Box<dyn StreamingIterator<Item = [u8]> + 'a + Send + Sync> {
let mut serializer = new_serializer(array.values().as_ref(), offset, take);
let mut serializer = new_serializer(array.values().as_ref(), false, offset, take);

Box::new(BufStreamingIterator::new(
ZipValidity::new(0..array.len(), array.validity().map(|x| x.iter())),
Expand Down Expand Up @@ -403,6 +405,7 @@ fn timestamp_tz_serializer<'a>(

pub(crate) fn new_serializer<'a>(
array: &'a dyn Array,
ignore_nulls: bool,
offset: usize,
take: usize,
) -> Box<dyn StreamingIterator<Item = [u8]> + 'a + Send + Sync> {
Expand Down Expand Up @@ -450,9 +453,12 @@ pub(crate) fn new_serializer<'a>(
ArrowDataType::Utf8View => {
utf8view_serializer(array.as_any().downcast_ref().unwrap(), offset, take)
},
ArrowDataType::Struct(_) => {
struct_serializer(array.as_any().downcast_ref().unwrap(), offset, take)
},
ArrowDataType::Struct(_) => struct_serializer(
array.as_any().downcast_ref().unwrap(),
ignore_nulls,
offset,
take,
),
ArrowDataType::FixedSizeList(_, _) => {
fixed_size_list_serializer(array.as_any().downcast_ref().unwrap(), offset, take)
},
Expand Down Expand Up @@ -522,13 +528,17 @@ fn serialize_item<'a>(
buffer: &mut Vec<u8>,
record: impl Iterator<Item = (&'a str, &'a [u8])>,
is_first_row: bool,
ignore_nulls: bool,
) {
if !is_first_row {
buffer.push(b',');
}
buffer.push(b'{');
let mut first_item = true;
for (key, value) in record {
if ignore_nulls && value == b"null" {
continue;
}
if !first_item {
buffer.push(b',');
}
Expand All @@ -544,7 +554,7 @@ fn serialize_item<'a>(
/// # Implementation
/// This operation is CPU-bounded
pub(crate) fn serialize(array: &dyn Array, buffer: &mut Vec<u8>) {
let mut serializer = new_serializer(array, 0, usize::MAX);
let mut serializer = new_serializer(array, false, 0, usize::MAX);

(0..array.len()).for_each(|i| {
if i != 0 {
Expand Down
4 changes: 2 additions & 2 deletions crates/polars-json/src/json/write/utf8.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,9 +141,9 @@ where
writer.write_all(s)
}

pub fn serialize_to_utf8(array: &dyn Array) -> Utf8ViewArray {
pub fn serialize_to_utf8(array: &dyn Array, ignore_nulls: bool) -> Utf8ViewArray {
let mut values = MutableBinaryViewArray::with_capacity(array.len());
let mut serializer = new_serializer(array, 0, usize::MAX);
let mut serializer = new_serializer(array, ignore_nulls, 0, usize::MAX);

while let Some(v) = serializer.next() {
unsafe { values.push_value(std::str::from_utf8_unchecked(v)) }
Expand Down
2 changes: 1 addition & 1 deletion crates/polars-json/src/ndjson/write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use polars_error::{PolarsError, PolarsResult};
use super::super::json::write::new_serializer;

fn serialize(array: &dyn Array, buffer: &mut Vec<u8>) {
let mut serializer = new_serializer(array, 0, usize::MAX);
let mut serializer = new_serializer(array, false, 0, usize::MAX);
(0..array.len()).for_each(|_| {
buffer.extend_from_slice(serializer.next().unwrap());
buffer.push(b'\n');
Expand Down
14 changes: 8 additions & 6 deletions crates/polars-plan/src/dsl/function_expr/struct_.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,9 @@ pub enum StructFunction {
PrefixFields(Arc<str>),
SuffixFields(Arc<str>),
#[cfg(feature = "json")]
JsonEncode,
JsonEncode {
ignore_nulls: bool,
},
WithFields,
MultipleFields(Arc<[ColumnName]>),
}
Expand Down Expand Up @@ -91,7 +93,7 @@ impl StructFunction {
_ => polars_bail!(op = "suffix_fields", got = dt, expected = "Struct"),
}),
#[cfg(feature = "json")]
JsonEncode => mapper.with_dtype(DataType::String),
JsonEncode { .. } => mapper.with_dtype(DataType::String),
WithFields => {
let args = mapper.args();
let struct_ = &args[0];
Expand Down Expand Up @@ -134,7 +136,7 @@ impl Display for StructFunction {
PrefixFields(_) => write!(f, "name.prefix_fields"),
SuffixFields(_) => write!(f, "name.suffixFields"),
#[cfg(feature = "json")]
JsonEncode => write!(f, "struct.to_json"),
JsonEncode { .. } => write!(f, "struct.to_json"),
WithFields => write!(f, "with_fields"),
MultipleFields(_) => write!(f, "multiple_fields"),
}
Expand All @@ -151,7 +153,7 @@ impl From<StructFunction> for SpecialEq<Arc<dyn SeriesUdf>> {
PrefixFields(prefix) => map!(prefix_fields, prefix.clone()),
SuffixFields(suffix) => map!(suffix_fields, suffix.clone()),
#[cfg(feature = "json")]
JsonEncode => map!(to_json),
JsonEncode { ignore_nulls } => map!(to_json, ignore_nulls),
WithFields => map_as_slice!(with_fields),
MultipleFields(_) => unimplemented!(),
}
Expand Down Expand Up @@ -215,13 +217,13 @@ pub(super) fn suffix_fields(s: &Series, suffix: Arc<str>) -> PolarsResult<Series
}

#[cfg(feature = "json")]
pub(super) fn to_json(s: &Series) -> PolarsResult<Series> {
pub(super) fn to_json(s: &Series, ignore_nulls: bool) -> PolarsResult<Series> {
let ca = s.struct_()?;
let dtype = ca.dtype().to_arrow(CompatLevel::newest());

let iter = ca.chunks().iter().map(|arr| {
let arr = arrow::compute::cast::cast_unchecked(arr.as_ref(), &dtype).unwrap();
polars_json::json::write::serialize_to_utf8(arr.as_ref())
polars_json::json::write::serialize_to_utf8(arr.as_ref(), ignore_nulls)
});

Ok(StringChunked::from_chunk_iter(ca.name(), iter).into_series())
Expand Down
6 changes: 4 additions & 2 deletions crates/polars-plan/src/dsl/struct_.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,11 @@ impl StructNameSpace {
}

#[cfg(feature = "json")]
pub fn json_encode(self) -> Expr {
pub fn json_encode(self, ignore_nulls: bool) -> Expr {
self.0
.map_private(FunctionExpr::StructExpr(StructFunction::JsonEncode))
.map_private(FunctionExpr::StructExpr(StructFunction::JsonEncode {
ignore_nulls,
}))
}

pub fn with_fields(self, fields: Vec<Expr>) -> PolarsResult<Expr> {
Expand Down
29 changes: 27 additions & 2 deletions py-polars/polars/expr/struct.py
Original file line number Diff line number Diff line change
Expand Up @@ -214,10 +214,21 @@ def rename_fields(self, names: Sequence[str]) -> Expr:
"""
return wrap_expr(self._pyexpr.struct_rename_fields(names))

def json_encode(self) -> Expr:
def json_encode(self, *, ignore_nulls: bool = False) -> Expr:
"""
Convert this struct to a string column with json values.

Parameters
----------
ignore_nulls
Ignore missing values in the struct when serializing.

- When `ignore_nulls=False`, the values in the struct are included even
if they are null

- When `ignore_nulls=True`, the values in the struct are skipped if they
are null

Examples
--------
>>> pl.DataFrame(
Expand All @@ -232,8 +243,22 @@ def json_encode(self) -> Expr:
│ {[1, 2],[45]} ┆ {"a":[1,2],"b":[45]} │
│ {[9, 1, 3],null} ┆ {"a":[9,1,3],"b":null} │
└──────────────────┴────────────────────────┘
>>> pl.DataFrame(
... {"a": [{"a": [1, 2], "b": [45]}, {"a": [9, 1, 3], "b": None}]}
... ).with_columns(
... pl.col("a").struct.json_encode(ignore_nulls=True).alias("encoded")
... )
shape: (2, 2)
┌──────────────────┬────────────────────────┐
│ a ┆ encoded │
│ --- ┆ --- │
│ struct[2] ┆ str │
╞══════════════════╪════════════════════════╡
│ {[1, 2],[45]} ┆ {"a":[1,2],"b":[45]} │
│ {[9, 1, 3],null} ┆ {"a":[9,1,3]} │
└──────────────────┴────────────────────────┘
"""
return wrap_expr(self._pyexpr.struct_json_encode())
return wrap_expr(self._pyexpr.struct_json_encode(ignore_nulls))

def with_fields(
self,
Expand Down
8 changes: 6 additions & 2 deletions py-polars/src/expr/struct.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,12 @@ impl PyExpr {
}

#[cfg(feature = "json")]
fn struct_json_encode(&self) -> Self {
self.inner.clone().struct_().json_encode().into()
fn struct_json_encode(&self, ignore_nulls: bool) -> Self {
self.inner
.clone()
.struct_()
.json_encode(ignore_nulls)
.into()
}

fn struct_with_fields(&self, fields: Vec<PyExpr>) -> PyResult<Self> {
Expand Down
Loading