Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix .nest.to_flat / to_lists #90

Merged
merged 3 commits into from
May 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 24 additions & 14 deletions src/nested_pandas/series/accessor.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# Python 3.9 doesn't support "|" for types
from __future__ import annotations

from collections import defaultdict
from collections.abc import Generator, MutableMapping
from typing import cast

Expand Down Expand Up @@ -53,14 +54,19 @@ def to_lists(self, fields: list[str] | None = None) -> pd.DataFrame:
if len(fields) == 0:
raise ValueError("Cannot convert a struct with no fields to lists")

struct_array = cast(pa.StructArray, pa.array(self._series))
list_chunks = defaultdict(list)
for chunk in self._series.array._pa_array.iterchunks():
struct_array = cast(pa.StructArray, chunk)
for field in fields:
list_array = cast(pa.ListArray, struct_array.field(field))
list_chunks[field].append(list_array)

list_series = {}
for field in fields:
list_array = cast(pa.ListArray, struct_array.field(field))
for field, chunks in list_chunks.items():
chunked_array = pa.chunked_array(chunks)
list_series[field] = pd.Series(
list_array,
dtype=pd.ArrowDtype(list_array.type),
chunked_array,
dtype=pd.ArrowDtype(chunked_array.type),
index=self._series.index,
name=field,
copy=False,
Expand All @@ -85,21 +91,25 @@ def to_flat(self, fields: list[str] | None = None) -> pd.DataFrame:
if len(fields) == 0:
raise ValueError("Cannot flatten a struct with no fields")

struct_array = cast(pa.StructArray, pa.array(self._series))
index = pd.Series(self.get_flat_index(), name=self._series.index.name)

flat_chunks = defaultdict(list)
for chunk in self._series.array._pa_array.iterchunks():
struct_array = cast(pa.StructArray, chunk)
for field in fields:
list_array = cast(pa.ListArray, struct_array.field(field))
flat_array = list_array.flatten()
flat_chunks[field].append(flat_array)

flat_series = {}
index = None
for field in fields:
list_array = cast(pa.ListArray, struct_array.field(field))
flat_array = list_array.flatten()
if index is None:
index = self.get_flat_index()
for field, chunks in flat_chunks.items():
chunked_array = pa.chunked_array(chunks)
flat_series[field] = pd.Series(
flat_array,
chunked_array,
index=pd.Series(index, name=self._series.index.name),
name=field,
copy=False,
dtype=pd.ArrowDtype(flat_array.type),
dtype=pd.ArrowDtype(chunked_array.type),
)

return pd.DataFrame(flat_series)
Expand Down
30 changes: 26 additions & 4 deletions src/nested_pandas/series/ext_array.py
Original file line number Diff line number Diff line change
Expand Up @@ -598,7 +598,7 @@ def _replace_pa_array(self, pa_array: pa.ChunkedArray, *, validate: bool) -> Non
self._dtype = NestedDtype(pa_array.chunk(0).type)

@property
def list_offsets(self) -> pa.ChunkedArray:
def list_offsets(self) -> pa.Array:
"""The list offsets of the field arrays.

It is a chunk array of list offsets of the first field array.
Expand All @@ -609,7 +609,25 @@ def list_offsets(self) -> pa.ChunkedArray:
pa.ChunkedArray
The list offsets of the field arrays.
"""
return pa.chunked_array([chunk.field(0).offsets for chunk in self._pa_array.iterchunks()])
# Quick and cheap path for a single chunk
if self._pa_array.num_chunks == 1:
struct_array = cast(pa.StructArray, self._pa_array.chunk(0))
return cast(pa.ListArray, struct_array.field(0)).offsets

chunks = []
# The offset of the current chunk in the flat array.
# It is 0 for the first chunk, and the last offset of the previous chunk for the next chunks,
# as a pa.Scalar.
chunk_offset: pa.Scalar | int = 0
for chunk in self._pa_array.iterchunks():
list_array = cast(pa.ListArray, chunk.field(0))
if chunk_offset == 0:
offsets = list_array.offsets
else:
offsets = pa.compute.add(list_array.offsets[1:], chunk_offset)
chunks.append(offsets)
chunk_offset = offsets[-1]
return pa.concat_arrays(chunks)

@property
def field_names(self) -> list[str]:
Expand All @@ -621,6 +639,11 @@ def flat_length(self) -> int:
"""Length of the flat arrays"""
return sum(chunk.field(0).value_lengths().sum().as_py() for chunk in self._pa_array.iterchunks())

@property
def num_chunks(self) -> int:
"""Number of chunks in underlying pyarrow.ChunkedArray"""
return self._pa_array.num_chunks

def view_fields(self, fields: str | list[str]) -> Self: # type: ignore[name-defined] # noqa: F821
"""Get a view of the series with only the specified fields

Expand Down Expand Up @@ -673,8 +696,7 @@ def set_flat_field(self, field: str, value: ArrayLike) -> None:
if len(pa_array) != self.flat_length:
raise ValueError("The input must be a struct_scalar or have the same length as the flat arrays")

offsets = self.list_offsets.combine_chunks()
list_array = pa.ListArray.from_arrays(values=pa_array, offsets=offsets)
list_array = pa.ListArray.from_arrays(values=pa_array, offsets=self.list_offsets)

return self.set_list_field(field, list_array)

Expand Down
29 changes: 29 additions & 0 deletions tests/nested_pandas/e2e_tests/test_issue89.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
"""Based on https://github.com/lincc-frameworks/nested-pandas/issues/89"""

import nested_pandas as npd
import numpy as np


def test_issue89():
"""Check that code snippet from issue 89 works as expected

https://github.com/lincc-frameworks/nested-pandas/issues/89
"""

# Load some ZTF data
catalogs_dir = "https://epyc.astro.washington.edu/~lincc-frameworks/half_degree_surveys/ztf/"

object_ndf = npd.read_parquet(
f"{catalogs_dir}/ztf_object/Norder=3/Dir=0/Npix=432.parquet",
columns=["ra", "dec", "ps1_objid"],
).set_index("ps1_objid")

source_ndf = npd.read_parquet(
f"{catalogs_dir}/ztf_source/Norder=6/Dir=20000/Npix=27711.parquet",
columns=["mjd", "mag", "magerr", "band", "ps1_objid", "catflags"],
).set_index("ps1_objid")

object_ndf = object_ndf.add_nested(source_ndf, "ztf_source")

nf = object_ndf
nf.reduce(np.mean, "ztf_source.mjd")
70 changes: 70 additions & 0 deletions tests/nested_pandas/series/test_accessor.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,39 @@ def test_to_lists():
assert_frame_equal(lists, desired)


def test_to_lists_for_chunked_array():
""" ""Test that the .nest.to_lists() when underlying array is chunked"""
struct_array = pa.StructArray.from_arrays(
arrays=[
[np.array([1.0, 2.0, 3.0]), -np.array([1.0, 2.0, 1.0])],
[np.array([4.0, 5.0, 6.0]), -np.array([3.0, 4.0, 5.0])],
],
names=["a", "b"],
)
chunked_array = pa.chunked_array([struct_array] * 3)
assert chunked_array.length() == 6
series = pd.Series(chunked_array, dtype=NestedDtype(chunked_array.type), index=[0, 1, 2, 3, 4, 5])
assert series.array.num_chunks == 3

lists = series.nest.to_lists()

desired = pd.DataFrame(
data={
"a": pd.Series(
data=[np.array([1.0, 2.0, 3.0]), -np.array([1.0, 2.0, 1.0])] * 3,
dtype=pd.ArrowDtype(pa.list_(pa.float64())),
index=[0, 1, 2, 3, 4, 5],
),
"b": pd.Series(
data=[np.array([4.0, 5.0, 6.0]), -np.array([3.0, 4.0, 5.0])] * 3,
dtype=pd.ArrowDtype(pa.list_(pa.float64())),
index=[0, 1, 2, 3, 4, 5],
),
},
)
assert_frame_equal(lists, desired)


def test_to_lists_with_fields():
"""Test that the .nest.to_lists(fields=...) method works."""
struct_array = pa.StructArray.from_arrays(
Expand Down Expand Up @@ -154,6 +187,43 @@ def test_to_flat():
assert_array_equal(flat[column], desired[column])


def test_to_flat_for_chunked_array():
"""Test that the .nest.to_flat() when underlying array is pa.ChunkedArray."""
struct_array = pa.StructArray.from_arrays(
arrays=[
[np.array([1.0, 2.0, 3.0]), -np.array([1.0, 2.0, 1.0])],
[np.array([4.0, 5.0, 6.0]), -np.array([3.0, 4.0, 5.0])],
],
names=["a", "b"],
)
chunked_array = pa.chunked_array([struct_array] * 3)
assert chunked_array.length() == 6
series = pd.Series(chunked_array, dtype=NestedDtype(chunked_array.type), index=[0, 1, 2, 3, 4, 5])
assert series.array.num_chunks == 3

flat = series.nest.to_flat()

desired = pd.DataFrame(
data={
"a": pd.Series(
data=[1.0, 2.0, 3.0, -1.0, -2.0, -1.0] * 3,
name="a",
index=np.repeat([0, 1, 2, 3, 4, 5], 3),
dtype=pd.ArrowDtype(pa.float64()),
),
"b": pd.Series(
data=[4.0, 5.0, 6.0, -3.0, -4.0, -5.0] * 3,
name="b",
index=np.repeat([0, 1, 2, 3, 4, 5], 3),
dtype=pd.ArrowDtype(pa.float64()),
),
},
index=pd.Index(np.repeat([0, 1, 2, 3, 4, 5], 3)),
)

assert_frame_equal(flat, desired)


def test_to_flat_with_fields():
"""Test that the .nest.to_flat(fields=...) method works."""
struct_array = pa.StructArray.from_arrays(
Expand Down
35 changes: 33 additions & 2 deletions tests/nested_pandas/series/test_ext_array.py
Original file line number Diff line number Diff line change
Expand Up @@ -588,8 +588,8 @@ def test_series_built_raises(data):
_array = NestedExtensionArray(pa_array)


def test_list_offsets():
"""Test that the list offsets are correct."""
def test_list_offsets_single_chunk():
"""Test that the .list_offset property is correct for a single chunk."""
struct_array = pa.StructArray.from_arrays(
arrays=[
pa.array([np.array([1, 2, 3]), np.array([1, 2, 1])], type=pa.list_(pa.uint8())),
Expand All @@ -603,6 +603,22 @@ def test_list_offsets():
assert_array_equal(ext_array.list_offsets, desired)


def test_list_offsets_multiple_chunks():
"""Test that the .list_offset property is correct for multiple chunks."""
struct_array = pa.StructArray.from_arrays(
arrays=[
pa.array([np.array([1, 2, 3]), np.array([1, 2, 1])], type=pa.list_(pa.uint8())),
pa.array([-np.array([4.0, 5.0, 6.0]), -np.array([3.0, 4.0, 5.0])]),
],
names=["a", "b"],
)
chunked_arrray = pa.chunked_array([struct_array, struct_array[:1], struct_array])
ext_array = NestedExtensionArray(chunked_arrray)

desired = chunked_arrray.combine_chunks().field("a").offsets
assert_array_equal(ext_array.list_offsets, desired)


def test___getitem___with_integer():
"""Test [i] is a valid DataFrame."""
struct_array = pa.StructArray.from_arrays(
Expand Down Expand Up @@ -1207,6 +1223,21 @@ def test_flat_length():
assert ext_array.flat_length == 7


def test_num_chunks():
"""Tests .num_chunks property."""
struct_array = pa.StructArray.from_arrays(
arrays=[
pa.array([np.array([1.0, 2.0, 3.0]), np.array([1.0, 2.0, 1.0, 2.0])]),
pa.array([-np.array([4.0, 5.0, 6.0]), -np.array([3.0, 4.0, 5.0, 6.0])]),
],
names=["a", "b"],
)
chunked_array = pa.chunked_array([struct_array] * 7)
ext_array = NestedExtensionArray(chunked_array)

assert ext_array.num_chunks == 7


def test_view_fields_with_single_field():
"""Tests ext_array.view("field")"""
arrays = [
Expand Down