From 957b077c5e32ddcc3fbd8ff97117bcb7adfbab08 Mon Sep 17 00:00:00 2001 From: Konstantin Malanchev Date: Tue, 28 May 2024 16:58:27 -0400 Subject: [PATCH 1/3] Fix to_lists to work with chunked series --- src/nested_pandas/series/accessor.py | 16 +++++++--- src/nested_pandas/series/ext_array.py | 5 +++ tests/nested_pandas/series/test_accessor.py | 33 ++++++++++++++++++++ tests/nested_pandas/series/test_ext_array.py | 15 +++++++++ 4 files changed, 64 insertions(+), 5 deletions(-) diff --git a/src/nested_pandas/series/accessor.py b/src/nested_pandas/series/accessor.py index 3bd967b..d4b21b1 100644 --- a/src/nested_pandas/series/accessor.py +++ b/src/nested_pandas/series/accessor.py @@ -1,6 +1,7 @@ # Python 3.9 doesn't support "|" for types from __future__ import annotations +from collections import defaultdict from collections.abc import Generator, MutableMapping from typing import cast @@ -53,14 +54,19 @@ def to_lists(self, fields: list[str] | None = None) -> pd.DataFrame: if len(fields) == 0: raise ValueError("Cannot convert a struct with no fields to lists") - struct_array = cast(pa.StructArray, pa.array(self._series)) + list_chunks = defaultdict(list) + for chunk in self._series.array._pa_array.iterchunks(): + struct_array = cast(pa.StructArray, chunk) + for field in fields: + list_array = cast(pa.ListArray, struct_array.field(field)) + list_chunks[field].append(list_array) list_series = {} - for field in fields: - list_array = cast(pa.ListArray, struct_array.field(field)) + for field, chunks in list_chunks.items(): + chunked_array = pa.chunked_array(chunks) list_series[field] = pd.Series( - list_array, - dtype=pd.ArrowDtype(list_array.type), + chunked_array, + dtype=pd.ArrowDtype(chunked_array.type), index=self._series.index, name=field, copy=False, diff --git a/src/nested_pandas/series/ext_array.py b/src/nested_pandas/series/ext_array.py index 71f1777..8c6f400 100644 --- a/src/nested_pandas/series/ext_array.py +++ b/src/nested_pandas/series/ext_array.py @@ -621,6 +621,11 @@ def flat_length(self) -> int: """Length of the flat arrays""" return sum(chunk.field(0).value_lengths().sum().as_py() for chunk in self._pa_array.iterchunks()) + @property + def num_chunks(self) -> int: + """Number of chunks in underlying pyarrow.ChunkedArray""" + return self._pa_array.num_chunks + def view_fields(self, fields: str | list[str]) -> Self: # type: ignore[name-defined] # noqa: F821 """Get a view of the series with only the specified fields diff --git a/tests/nested_pandas/series/test_accessor.py b/tests/nested_pandas/series/test_accessor.py index 5ee617d..7b5ed5b 100644 --- a/tests/nested_pandas/series/test_accessor.py +++ b/tests/nested_pandas/series/test_accessor.py @@ -70,6 +70,39 @@ def test_to_lists(): assert_frame_equal(lists, desired) +def test_to_lists_for_chunked_array(): + """ ""Test that the .nest.to_lists() when underlying array is pa.ChunkedArray.""" + struct_array = pa.StructArray.from_arrays( + arrays=[ + [np.array([1.0, 2.0, 3.0]), -np.array([1.0, 2.0, 1.0])], + [np.array([4.0, 5.0, 6.0]), -np.array([3.0, 4.0, 5.0])], + ], + names=["a", "b"], + ) + chunked_array = pa.chunked_array([struct_array] * 3) + assert chunked_array.length() == 6 + series = pd.Series(chunked_array, dtype=NestedDtype(chunked_array.type), index=[0, 1, 2, 3, 4, 5]) + assert series.array.num_chunks == 3 + + lists = series.nest.to_lists() + + desired = pd.DataFrame( + data={ + "a": pd.Series( + data=[np.array([1.0, 2.0, 3.0]), -np.array([1.0, 2.0, 1.0])] * 3, + dtype=pd.ArrowDtype(pa.list_(pa.float64())), + index=[0, 1, 2, 3, 4, 5], + ), + "b": pd.Series( + data=[np.array([4.0, 5.0, 6.0]), -np.array([3.0, 4.0, 5.0])] * 3, + dtype=pd.ArrowDtype(pa.list_(pa.float64())), + index=[0, 1, 2, 3, 4, 5], + ), + }, + ) + assert_frame_equal(lists, desired) + + def test_to_lists_with_fields(): """Test that the .nest.to_lists(fields=...) method works.""" struct_array = pa.StructArray.from_arrays( diff --git a/tests/nested_pandas/series/test_ext_array.py b/tests/nested_pandas/series/test_ext_array.py index b263240..d04fd2c 100644 --- a/tests/nested_pandas/series/test_ext_array.py +++ b/tests/nested_pandas/series/test_ext_array.py @@ -1207,6 +1207,21 @@ def test_flat_length(): assert ext_array.flat_length == 7 +def test_num_chunks(): + """Tests .num_chunks property.""" + struct_array = pa.StructArray.from_arrays( + arrays=[ + pa.array([np.array([1.0, 2.0, 3.0]), np.array([1.0, 2.0, 1.0, 2.0])]), + pa.array([-np.array([4.0, 5.0, 6.0]), -np.array([3.0, 4.0, 5.0, 6.0])]), + ], + names=["a", "b"], + ) + chunked_array = pa.chunked_array([struct_array] * 7) + ext_array = NestedExtensionArray(chunked_array) + + assert ext_array.num_chunks == 7 + + def test_view_fields_with_single_field(): """Tests ext_array.view("field")""" arrays = [ From eaddd7312c88596ff815a42a80be53b6408ced71 Mon Sep 17 00:00:00 2001 From: Konstantin Malanchev Date: Tue, 28 May 2024 17:39:32 -0400 Subject: [PATCH 2/3] Fix to_flat to work with chunked series --- src/nested_pandas/series/accessor.py | 22 ++++++----- src/nested_pandas/series/ext_array.py | 25 +++++++++++-- tests/nested_pandas/series/test_accessor.py | 39 +++++++++++++++++++- tests/nested_pandas/series/test_ext_array.py | 20 +++++++++- 4 files changed, 90 insertions(+), 16 deletions(-) diff --git a/src/nested_pandas/series/accessor.py b/src/nested_pandas/series/accessor.py index d4b21b1..85ca6dc 100644 --- a/src/nested_pandas/series/accessor.py +++ b/src/nested_pandas/series/accessor.py @@ -91,21 +91,25 @@ def to_flat(self, fields: list[str] | None = None) -> pd.DataFrame: if len(fields) == 0: raise ValueError("Cannot flatten a struct with no fields") - struct_array = cast(pa.StructArray, pa.array(self._series)) + index = pd.Series(self.get_flat_index(), name=self._series.index.name) + + flat_chunks = defaultdict(list) + for chunk in self._series.array._pa_array.iterchunks(): + struct_array = cast(pa.StructArray, chunk) + for field in fields: + list_array = cast(pa.ListArray, struct_array.field(field)) + flat_array = list_array.flatten() + flat_chunks[field].append(flat_array) flat_series = {} - index = None - for field in fields: - list_array = cast(pa.ListArray, struct_array.field(field)) - flat_array = list_array.flatten() - if index is None: - index = self.get_flat_index() + for field, chunks in flat_chunks.items(): + chunked_array = pa.chunked_array(chunks) flat_series[field] = pd.Series( - flat_array, + chunked_array, index=pd.Series(index, name=self._series.index.name), name=field, copy=False, - dtype=pd.ArrowDtype(flat_array.type), + dtype=pd.ArrowDtype(chunked_array.type), ) return pd.DataFrame(flat_series) diff --git a/src/nested_pandas/series/ext_array.py b/src/nested_pandas/series/ext_array.py index 8c6f400..e915cdd 100644 --- a/src/nested_pandas/series/ext_array.py +++ b/src/nested_pandas/series/ext_array.py @@ -598,7 +598,7 @@ def _replace_pa_array(self, pa_array: pa.ChunkedArray, *, validate: bool) -> Non self._dtype = NestedDtype(pa_array.chunk(0).type) @property - def list_offsets(self) -> pa.ChunkedArray: + def list_offsets(self) -> pa.Array: """The list offsets of the field arrays. It is a chunk array of list offsets of the first field array. @@ -609,7 +609,25 @@ def list_offsets(self) -> pa.ChunkedArray: pa.ChunkedArray The list offsets of the field arrays. """ - return pa.chunked_array([chunk.field(0).offsets for chunk in self._pa_array.iterchunks()]) + # Quick and cheap path for a single chunk + if self._pa_array.num_chunks == 1: + struct_array = cast(pa.StructArray, self._pa_array.chunk(0)) + return cast(pa.ListArray, struct_array.field(0)).offsets + + chunks = [] + # The offset of the current chunk in the flat array. + # It is 0 for the first chunk, and the last offset of the previous chunk for the next chunks, + # as a pa.Scalar. + chunk_offset: pa.Scalar | int = 0 + for chunk in self._pa_array.iterchunks(): + list_array = cast(pa.ListArray, chunk.field(0)) + if chunk_offset == 0: + offsets = list_array.offsets + else: + offsets = pa.compute.add(list_array.offsets[1:], chunk_offset) + chunks.append(offsets) + chunk_offset = offsets[-1] + return pa.concat_arrays(chunks) @property def field_names(self) -> list[str]: @@ -678,8 +696,7 @@ def set_flat_field(self, field: str, value: ArrayLike) -> None: if len(pa_array) != self.flat_length: raise ValueError("The input must be a struct_scalar or have the same length as the flat arrays") - offsets = self.list_offsets.combine_chunks() - list_array = pa.ListArray.from_arrays(values=pa_array, offsets=offsets) + list_array = pa.ListArray.from_arrays(values=pa_array, offsets=self.list_offsets) return self.set_list_field(field, list_array) diff --git a/tests/nested_pandas/series/test_accessor.py b/tests/nested_pandas/series/test_accessor.py index 7b5ed5b..fcb6ee5 100644 --- a/tests/nested_pandas/series/test_accessor.py +++ b/tests/nested_pandas/series/test_accessor.py @@ -71,7 +71,7 @@ def test_to_lists(): def test_to_lists_for_chunked_array(): - """ ""Test that the .nest.to_lists() when underlying array is pa.ChunkedArray.""" + """ ""Test that the .nest.to_lists() when underlying array is chunked""" struct_array = pa.StructArray.from_arrays( arrays=[ [np.array([1.0, 2.0, 3.0]), -np.array([1.0, 2.0, 1.0])], @@ -187,6 +187,43 @@ def test_to_flat(): assert_array_equal(flat[column], desired[column]) +def test_to_flat_for_chunked_array(): + """Test that the .nest.to_flat() when underlying array is pa.ChunkedArray.""" + struct_array = pa.StructArray.from_arrays( + arrays=[ + [np.array([1.0, 2.0, 3.0]), -np.array([1.0, 2.0, 1.0])], + [np.array([4.0, 5.0, 6.0]), -np.array([3.0, 4.0, 5.0])], + ], + names=["a", "b"], + ) + chunked_array = pa.chunked_array([struct_array] * 3) + assert chunked_array.length() == 6 + series = pd.Series(chunked_array, dtype=NestedDtype(chunked_array.type), index=[0, 1, 2, 3, 4, 5]) + assert series.array.num_chunks == 3 + + flat = series.nest.to_flat() + + desired = pd.DataFrame( + data={ + "a": pd.Series( + data=[1.0, 2.0, 3.0, -1.0, -2.0, -1.0] * 3, + name="a", + index=np.repeat([0, 1, 2, 3, 4, 5], 3), + dtype=pd.ArrowDtype(pa.float64()), + ), + "b": pd.Series( + data=[4.0, 5.0, 6.0, -3.0, -4.0, -5.0] * 3, + name="b", + index=np.repeat([0, 1, 2, 3, 4, 5], 3), + dtype=pd.ArrowDtype(pa.float64()), + ), + }, + index=pd.Index(np.repeat([0, 1, 2, 3, 4, 5], 3)), + ) + + assert_frame_equal(flat, desired) + + def test_to_flat_with_fields(): """Test that the .nest.to_flat(fields=...) method works.""" struct_array = pa.StructArray.from_arrays( diff --git a/tests/nested_pandas/series/test_ext_array.py b/tests/nested_pandas/series/test_ext_array.py index d04fd2c..1ea0193 100644 --- a/tests/nested_pandas/series/test_ext_array.py +++ b/tests/nested_pandas/series/test_ext_array.py @@ -588,8 +588,8 @@ def test_series_built_raises(data): _array = NestedExtensionArray(pa_array) -def test_list_offsets(): - """Test that the list offsets are correct.""" +def test_list_offsets_single_chunk(): + """Test that the .list_offset property is correct for a single chunk.""" struct_array = pa.StructArray.from_arrays( arrays=[ pa.array([np.array([1, 2, 3]), np.array([1, 2, 1])], type=pa.list_(pa.uint8())), @@ -603,6 +603,22 @@ def test_list_offsets(): assert_array_equal(ext_array.list_offsets, desired) +def test_list_offsets_multiple_chunks(): + """Test that the .list_offset property is correct for multiple chunks.""" + struct_array = pa.StructArray.from_arrays( + arrays=[ + pa.array([np.array([1, 2, 3]), np.array([1, 2, 1])], type=pa.list_(pa.uint8())), + pa.array([-np.array([4.0, 5.0, 6.0]), -np.array([3.0, 4.0, 5.0])]), + ], + names=["a", "b"], + ) + chunked_arrray = pa.chunked_array([struct_array, struct_array[:1], struct_array]) + ext_array = NestedExtensionArray(chunked_arrray) + + desired = chunked_arrray.combine_chunks().field("a").offsets + assert_array_equal(ext_array.list_offsets, desired) + + def test___getitem___with_integer(): """Test [i] is a valid DataFrame.""" struct_array = pa.StructArray.from_arrays( From ebcc24b82a09a204376118510f81e0163eee2a29 Mon Sep 17 00:00:00 2001 From: Konstantin Malanchev Date: Tue, 28 May 2024 17:53:09 -0400 Subject: [PATCH 3/3] #89 code as E2E test --- tests/nested_pandas/e2e_tests/test_issue89.py | 29 +++++++++++++++++++ 1 file changed, 29 insertions(+) create mode 100644 tests/nested_pandas/e2e_tests/test_issue89.py diff --git a/tests/nested_pandas/e2e_tests/test_issue89.py b/tests/nested_pandas/e2e_tests/test_issue89.py new file mode 100644 index 0000000..6919647 --- /dev/null +++ b/tests/nested_pandas/e2e_tests/test_issue89.py @@ -0,0 +1,29 @@ +"""Based on https://github.com/lincc-frameworks/nested-pandas/issues/89""" + +import nested_pandas as npd +import numpy as np + + +def test_issue89(): + """Check that code snippet from issue 89 works as expected + + https://github.com/lincc-frameworks/nested-pandas/issues/89 + """ + + # Load some ZTF data + catalogs_dir = "https://epyc.astro.washington.edu/~lincc-frameworks/half_degree_surveys/ztf/" + + object_ndf = npd.read_parquet( + f"{catalogs_dir}/ztf_object/Norder=3/Dir=0/Npix=432.parquet", + columns=["ra", "dec", "ps1_objid"], + ).set_index("ps1_objid") + + source_ndf = npd.read_parquet( + f"{catalogs_dir}/ztf_source/Norder=6/Dir=20000/Npix=27711.parquet", + columns=["mjd", "mag", "magerr", "band", "ps1_objid", "catflags"], + ).set_index("ps1_objid") + + object_ndf = object_ndf.add_nested(source_ndf, "ztf_source") + + nf = object_ndf + nf.reduce(np.mean, "ztf_source.mjd")