Skip to content

Commit

Permalink
Add NestedFrame.reduce (#32)
Browse files Browse the repository at this point in the history
* Propagate index name in `to_flat`

* Add comment

* Add 'reduce' implementation for a nested 'apply'

* Add user function example

* Delete docs/notebooks/test_nested.ipynb

* Clean up conditional

* Fix precommit failures

* fix typing

* Format fix

* The final typing

* Typing changes

* The final typing?

* Yet another lint fix

* Try to make mypy happy with an empty tuple

* And another ruff check

* Addressed comment

* Removed whitespace
  • Loading branch information
wilsonbb authored Apr 18, 2024
1 parent ac4032f commit 025ad96
Show file tree
Hide file tree
Showing 2 changed files with 169 additions and 1 deletion.
90 changes: 89 additions & 1 deletion src/nested_pandas/nestedframe/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import numpy as np
import pandas as pd
from pandas._libs import lib
from pandas._typing import AnyAll, Axis, IndexLabel
from pandas._typing import Any, AnyAll, Axis, IndexLabel
from pandas.api.extensions import no_default

from nested_pandas.series import packer
Expand Down Expand Up @@ -58,6 +58,10 @@ def _is_known_hierarchical_column(self, colname) -> bool:
return False
return False

def _is_known_column(self, colname) -> bool:
"""Determine whether a string is a known column name"""
return colname in self.columns or self._is_known_hierarchical_column(colname)

def add_nested(self, nested, name) -> Self: # type: ignore[name-defined] # noqa: F821
"""Packs a dataframe into a nested column"""
# Add sources to objects
Expand Down Expand Up @@ -310,3 +314,87 @@ def dropna(
)
)
return new_df

def reduce(self, func, *args, **kwargs) -> NestedFrame:
"""
Takes a function and applies it to each top-level row of the NestedFrame.
The user may specify which columns the function is applied to, with
columns from the 'base' layer being passsed to the function as
scalars and columns from the nested layers being passed as numpy arrays.
Parameters
----------
func : callable
Function to apply to each nested dataframe. The first arguments to `func` should be which
columns to apply the function to.
args : positional arguments
Positional arguments to pass to the function, the first *args should be the names of the
columns to apply the function to.
kwargs : keyword arguments, optional
Keyword arguments to pass to the function.
Returns
-------
`NestedFrame`
`NestedFrame` with the results of the function applied to the columns of the frame.
Notes
-----
The recommend return value of func should be a `pd.Series` where the indices are the names of the
output columns in the dataframe returned by `reduce`. Note however that in cases where func
returns a single value there may be a performance benefit to returning the scalar value
rather than a `pd.Series`.
Example User Function:
```
import pandas as pd
def my_sum(col1, col2):
return pd.Series(
[sum(col1), sum(col2)],
index=["sum_col1", "sum_col2"],
)
```
"""
# Parse through the initial args to determine the columns to apply the function to
requested_columns = []
for arg in args:
if not isinstance(arg, str) or not self._is_known_column(arg):
# We've reached an argument that is not a valid column, so we assume
# the remaining args are extra arguments to the function
break
layer = "base" if "." not in arg else arg.split(".")[0]
col = arg.split(".")[-1]
requested_columns.append((layer, col))

# We require the first *args to be the columns to apply the function to
if not requested_columns:
raise ValueError("No columns in `*args` specified to apply function to")

# The remaining args are the extra arguments to the function other than columns
extra_args: tuple[Any, ...] = () # empty tuple to make mypy happy
if len(requested_columns) < len(args):
extra_args = args[len(requested_columns) :]

# Translates the requested columns into the scalars or arrays we pass to func.
def translate_cols(frame, layer, col):
if layer == "base":
# We pass the "base" column as a scalar
return frame[col]
return frame[layer][col].to_numpy()

# Note that this applys the function to each row of the nested dataframe. For
# the columns within packed frames, note taht we're directly accessing the dataframe
# within the cell of that row without having to unpack and flatten.
result = self.apply(
lambda x: func(
*[translate_cols(x, layer, col) for layer, col in requested_columns], *extra_args, **kwargs
),
axis=1, # to apply func on each row of our nested frame
result_type="expand", # to return a DataFrame when possible
)

return result
80 changes: 80 additions & 0 deletions tests/nested_pandas/nestedframe/test_nestedframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -222,3 +222,83 @@ def test_dropna_errors():
# Test on-nested + subset disagreement
with pytest.raises(ValueError):
base.dropna(on_nested="nested", subset=["b"])


def test_reduce():
"""Tests that we can call reduce on a NestedFrame with a custom function."""
nf = NestedFrame(
data={"a": [1, 2, 3], "b": [2, 4, 6]},
index=pd.Index([0, 1, 2], name="idx"),
)

to_pack = pd.DataFrame(
data={
"time": [1, 2, 3, 1, 2, 4, 2, 1, 4],
"c": [0, 2, 4, 10, 4, 3, 1, 4, 1],
"d": [5, 4, 7, 5, 3, 1, 9, 3, 4],
},
index=pd.Index([0, 0, 0, 1, 1, 1, 2, 2, 2], name="idx"),
)

to_pack2 = pd.DataFrame(
data={
"time": [1, 2, 3, 1, 2, 3, 1, 2, 4],
"e": [2, 9, 4, 1, 23, 3, 1, 4, 1],
"f": [5, 4, 7, 5, 3, 25, 9, 3, 4],
},
index=pd.Index([0, 0, 0, 1, 1, 1, 2, 2, 2], name="idx"),
)

# Add two nested layers to pack into our dataframe
nf = nf.add_nested(to_pack, "packed").add_nested(to_pack2, "packed2")

# Define a simple custom function to apply to the nested data
def get_max(col1, col2):
# returns the max value within each specified colun
return pd.Series([col1.max(), col2.max()], index=["max_col1", "max_col2"])

# The expected max values for of our nested columns
expected_max_c = [4, 10, 4]
expected_max_d = [7, 5, 9]
expected_max_e = [9, 23, 4]

# Test that we raise an error when no arguments are provided
with pytest.raises(ValueError):
nf.reduce(get_max)

# Batch only on columns in the first packed layer
result = nf.reduce(get_max, "packed.c", "packed.d")
assert len(result) == len(nf)
assert isinstance(result, NestedFrame)
assert result.index.name == "idx"
for i in range(len(result)):
assert result["max_col1"].values[i] == expected_max_c[i]
assert result["max_col2"].values[i] == expected_max_d[i]

# Batch on columns in the first and second packed layers
result = nf.reduce(get_max, "packed.c", "packed2.e")
assert len(result) == len(nf)
assert isinstance(result, NestedFrame)
assert result.index.name == "idx"
for i in range(len(result)):
assert result["max_col1"].values[i] == expected_max_c[i]
assert result["max_col2"].values[i] == expected_max_e[i]

# Test that we can pass a scalar from the base layer to the reduce function and that
# the user can also provide non-column arguments (in this case, the list of column names)
def offset_avg(offset, col_to_avg, column_names):
# A simple function which adds a scalar 'offset' to a column which is then averaged.
return pd.Series([(offset + col_to_avg).mean()], index=column_names)

expected_offset_avg = [
sum([2, 4, 6]) / 3.0,
sum([14, 8, 7]) / 3.0,
sum([7, 10, 7]) / 3.0,
]

result = nf.reduce(offset_avg, "b", "packed.c", ["offset_avg"])
assert len(result) == len(nf)
assert isinstance(result, NestedFrame)
assert result.index.name == "idx"
for i in range(len(result)):
assert result["offset_avg"].values[i] == expected_offset_avg[i]

0 comments on commit 025ad96

Please sign in to comment.