diff --git a/src/nested_pandas/nestedframe/core.py b/src/nested_pandas/nestedframe/core.py index 6ebef11..c9d6297 100644 --- a/src/nested_pandas/nestedframe/core.py +++ b/src/nested_pandas/nestedframe/core.py @@ -4,7 +4,7 @@ import numpy as np import pandas as pd from pandas._libs import lib -from pandas._typing import AnyAll, Axis, IndexLabel +from pandas._typing import Any, AnyAll, Axis, IndexLabel from pandas.api.extensions import no_default from nested_pandas.series import packer @@ -58,6 +58,10 @@ def _is_known_hierarchical_column(self, colname) -> bool: return False return False + def _is_known_column(self, colname) -> bool: + """Determine whether a string is a known column name""" + return colname in self.columns or self._is_known_hierarchical_column(colname) + def add_nested(self, nested, name) -> Self: # type: ignore[name-defined] # noqa: F821 """Packs a dataframe into a nested column""" # Add sources to objects @@ -310,3 +314,87 @@ def dropna( ) ) return new_df + + def reduce(self, func, *args, **kwargs) -> NestedFrame: + """ + Takes a function and applies it to each top-level row of the NestedFrame. + + The user may specify which columns the function is applied to, with + columns from the 'base' layer being passsed to the function as + scalars and columns from the nested layers being passed as numpy arrays. + + Parameters + ---------- + func : callable + Function to apply to each nested dataframe. The first arguments to `func` should be which + columns to apply the function to. + args : positional arguments + Positional arguments to pass to the function, the first *args should be the names of the + columns to apply the function to. + kwargs : keyword arguments, optional + Keyword arguments to pass to the function. + + Returns + ------- + `NestedFrame` + `NestedFrame` with the results of the function applied to the columns of the frame. + + Notes + ----- + The recommend return value of func should be a `pd.Series` where the indices are the names of the + output columns in the dataframe returned by `reduce`. Note however that in cases where func + returns a single value there may be a performance benefit to returning the scalar value + rather than a `pd.Series`. + + Example User Function: + ``` + import pandas as pd + + def my_sum(col1, col2): + return pd.Series( + [sum(col1), sum(col2)], + index=["sum_col1", "sum_col2"], + ) + + ``` + + """ + # Parse through the initial args to determine the columns to apply the function to + requested_columns = [] + for arg in args: + if not isinstance(arg, str) or not self._is_known_column(arg): + # We've reached an argument that is not a valid column, so we assume + # the remaining args are extra arguments to the function + break + layer = "base" if "." not in arg else arg.split(".")[0] + col = arg.split(".")[-1] + requested_columns.append((layer, col)) + + # We require the first *args to be the columns to apply the function to + if not requested_columns: + raise ValueError("No columns in `*args` specified to apply function to") + + # The remaining args are the extra arguments to the function other than columns + extra_args: tuple[Any, ...] = () # empty tuple to make mypy happy + if len(requested_columns) < len(args): + extra_args = args[len(requested_columns) :] + + # Translates the requested columns into the scalars or arrays we pass to func. + def translate_cols(frame, layer, col): + if layer == "base": + # We pass the "base" column as a scalar + return frame[col] + return frame[layer][col].to_numpy() + + # Note that this applys the function to each row of the nested dataframe. For + # the columns within packed frames, note taht we're directly accessing the dataframe + # within the cell of that row without having to unpack and flatten. + result = self.apply( + lambda x: func( + *[translate_cols(x, layer, col) for layer, col in requested_columns], *extra_args, **kwargs + ), + axis=1, # to apply func on each row of our nested frame + result_type="expand", # to return a DataFrame when possible + ) + + return result diff --git a/tests/nested_pandas/nestedframe/test_nestedframe.py b/tests/nested_pandas/nestedframe/test_nestedframe.py index 7f39c8e..782dbf7 100644 --- a/tests/nested_pandas/nestedframe/test_nestedframe.py +++ b/tests/nested_pandas/nestedframe/test_nestedframe.py @@ -222,3 +222,83 @@ def test_dropna_errors(): # Test on-nested + subset disagreement with pytest.raises(ValueError): base.dropna(on_nested="nested", subset=["b"]) + + +def test_reduce(): + """Tests that we can call reduce on a NestedFrame with a custom function.""" + nf = NestedFrame( + data={"a": [1, 2, 3], "b": [2, 4, 6]}, + index=pd.Index([0, 1, 2], name="idx"), + ) + + to_pack = pd.DataFrame( + data={ + "time": [1, 2, 3, 1, 2, 4, 2, 1, 4], + "c": [0, 2, 4, 10, 4, 3, 1, 4, 1], + "d": [5, 4, 7, 5, 3, 1, 9, 3, 4], + }, + index=pd.Index([0, 0, 0, 1, 1, 1, 2, 2, 2], name="idx"), + ) + + to_pack2 = pd.DataFrame( + data={ + "time": [1, 2, 3, 1, 2, 3, 1, 2, 4], + "e": [2, 9, 4, 1, 23, 3, 1, 4, 1], + "f": [5, 4, 7, 5, 3, 25, 9, 3, 4], + }, + index=pd.Index([0, 0, 0, 1, 1, 1, 2, 2, 2], name="idx"), + ) + + # Add two nested layers to pack into our dataframe + nf = nf.add_nested(to_pack, "packed").add_nested(to_pack2, "packed2") + + # Define a simple custom function to apply to the nested data + def get_max(col1, col2): + # returns the max value within each specified colun + return pd.Series([col1.max(), col2.max()], index=["max_col1", "max_col2"]) + + # The expected max values for of our nested columns + expected_max_c = [4, 10, 4] + expected_max_d = [7, 5, 9] + expected_max_e = [9, 23, 4] + + # Test that we raise an error when no arguments are provided + with pytest.raises(ValueError): + nf.reduce(get_max) + + # Batch only on columns in the first packed layer + result = nf.reduce(get_max, "packed.c", "packed.d") + assert len(result) == len(nf) + assert isinstance(result, NestedFrame) + assert result.index.name == "idx" + for i in range(len(result)): + assert result["max_col1"].values[i] == expected_max_c[i] + assert result["max_col2"].values[i] == expected_max_d[i] + + # Batch on columns in the first and second packed layers + result = nf.reduce(get_max, "packed.c", "packed2.e") + assert len(result) == len(nf) + assert isinstance(result, NestedFrame) + assert result.index.name == "idx" + for i in range(len(result)): + assert result["max_col1"].values[i] == expected_max_c[i] + assert result["max_col2"].values[i] == expected_max_e[i] + + # Test that we can pass a scalar from the base layer to the reduce function and that + # the user can also provide non-column arguments (in this case, the list of column names) + def offset_avg(offset, col_to_avg, column_names): + # A simple function which adds a scalar 'offset' to a column which is then averaged. + return pd.Series([(offset + col_to_avg).mean()], index=column_names) + + expected_offset_avg = [ + sum([2, 4, 6]) / 3.0, + sum([14, 8, 7]) / 3.0, + sum([7, 10, 7]) / 3.0, + ] + + result = nf.reduce(offset_avg, "b", "packed.c", ["offset_avg"]) + assert len(result) == len(nf) + assert isinstance(result, NestedFrame) + assert result.index.name == "idx" + for i in range(len(result)): + assert result["offset_avg"].values[i] == expected_offset_avg[i]