Skip to content

Commit

Permalink
ApplyByRowTransformer and DropColumnTransformer
Browse files Browse the repository at this point in the history
  • Loading branch information
seanmacavaney committed Sep 17, 2024
1 parent d76744d commit 5a087ac
Show file tree
Hide file tree
Showing 2 changed files with 121 additions and 21 deletions.
19 changes: 5 additions & 14 deletions pyterrier/apply.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import numpy.typing as npt
import pandas as pd
import pyterrier as pt
from pyterrier.apply_base import ApplyDocumentScoringTransformer, ApplyQueryTransformer, ApplyDocFeatureTransformer, ApplyForEachQuery, ApplyIterForEachQuery, ApplyGenericTransformer, ApplyGenericIterTransformer, ApplyIndexer
from pyterrier.apply_base import ApplyDocumentScoringTransformer, ApplyQueryTransformer, ApplyDocFeatureTransformer, ApplyForEachQuery, ApplyIterForEachQuery, ApplyGenericTransformer, ApplyGenericIterTransformer, ApplyIndexer, DropColumnTransformer, ApplyByRowTransformer

def _bind(instance, func, as_name=None):
"""
Expand Down Expand Up @@ -238,18 +238,9 @@ def __init__(self):
def __getattr__(self, item):
return partial(generic_apply, item)

def generic_apply(name, *args, drop=False, **kwargs) -> pt.Transformer:
def generic_apply(name, fn=None, *, drop=False) -> pt.Transformer:
if drop:
return ApplyGenericTransformer(lambda df : df.drop(name, axis=1), *args, **kwargs)

if len(args) == 0:
raise ValueError("Must specify a fn, e.g. a lambda")

fn = args[0]
args=()
assert fn is None, "cannot provide both fn and drop=True"
return DropColumnTransformer(name)

def _new_column(df):
df[name] = df.apply(fn, axis=1, result_type='reduce')
return df

return ApplyGenericTransformer(_new_column, *args, **kwargs)
return ApplyByRowTransformer(name, fn)
123 changes: 116 additions & 7 deletions pyterrier/apply_base.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,122 @@
import math
from typing import Callable, Any, Union, Optional
import pandas as pd

import pyterrier as pt
from .transformer import Transformer, Indexer
from .model import add_ranks, split_df
from typing import Iterable, Iterator
import pandas as pd
import pyterrier as pt


class DropColumnTransformer(pt.Transformer):
"""
This transformer drops the provided column from the input.
"""
def __init__(self, col: str):
"""
Instantiates a DropColumnTransformer.
Arguments:
col: The column to drop
"""
self.col = col

def transform(self, inp: pd.DataFrame) -> pd.DataFrame:
"""
Drops the column from the input DataFrame.
Arguments:
inp: The input DataFrame
Returns:
The input DataFrame with the column dropped.
"""
return inp.drop(columns=[self.col])

def transform_iter(self, inp: pt.model.IterDict) -> pt.model.IterDict:
"""
Drops the column from the input IterDict.
Arguments:
inp: The input IterDict
Returns:
The input with the column dropped.
"""
for rec in inp:
new_rec = rec.copy()
new_rec.pop(self.col, None) # None ensures no error if key doesn't exist
yield new_rec

def __repr__(self):
return f"pt.apply.{self.col}(drop=True)"


class ApplyByRowTransformer(pt.Transformer):
"""
This transformer applies a function to each row in the input and assigns the result to a new column.
"""
def __init__(self,
col: str,
fn: Callable[[Union[pt.model.IterDictRecord, pd.Series]], Any],
*,
batch_size: Optional[int] = None,
verbose: bool = False
):
"""
Instantiates a ApplyByRowTransformer.
Arguments:
col: The column to assign the result of the function to
fn: The function to apply to each row
batch_size: The number of rows to process at once. If None, processes in one batch. This only applies
when processing DataFrames.
verbose: Whether to display a progress bar when processing in batch mode.
"""
self.col = col
self.fn = fn
self.batch_size = batch_size
self.verbose = verbose

def transform(self, inp: pd.DataFrame) -> pd.DataFrame:
"""
Applies the function to each row in the input DataFrame and assigns the result to a new column.
Arguments:
inp: The input DataFrame
Returns:
The input DataFrame with the new values assign to ``col``
"""
if self.batch_size is None:
return self._apply_df(inp)

# batching
num_chunks = math.ceil(len(inp) / self.batch_size)
iterator = pt.model.split_df(inp, num_chunks)
if self.verbose:
iterator = pt.tqdm(iterator, desc="pt.apply", unit='row')
return pd.concat([self._apply_df(chunk_df) for chunk_df in iterator])

def _apply_df(self, inp: pd.DataFrame) -> pd.DataFrame:
new_vals = inp.apply(self.fn, axis=1, result_type='reduce')
return inp.assign(**{self.col: new_vals})

def transform_iter(self, inp: pt.model.IterDict) -> pt.model.IterDict:
"""
Applies the function to each row in the input IterDict and assigns the result to a new column.
Arguments:
inp: The input IterDict
Returns:
The input IterDict with the new values assign to ``col``
"""
for rec in inp:
yield dict(rec, **{self.col: self.fn(rec)})

def __repr__(self):
return f"pt.apply.{self.col}()"


class ApplyTransformerBase(Transformer):
"""
Expand Down Expand Up @@ -34,9 +147,6 @@ def transform(self, res):
if len(res) == 0:
return self.fn(res)

import math, pandas as pd
from pyterrier.model import split_df

it = res.groupby("qid")
lastqid = None
if self.verbose:
Expand Down Expand Up @@ -306,7 +416,6 @@ def transform(self, inputRes):
return self.fn(inputRes)

# batching
import math, pandas as pd
from pyterrier.model import split_df
num_chunks = math.ceil( len(inputRes) / self.batch_size )
iterator = split_df(inputRes, num_chunks)
Expand Down

0 comments on commit 5a087ac

Please sign in to comment.