Skip to content

Commit

Permalink
API change: Transformer.transform_iter() returns an iter-dict generat…
Browse files Browse the repository at this point in the history
…or (#481)

(1).  .transform_iter() takes and returns an iterable of dicts generator, rather than a DataFrame
(2) As a consequence of (1), .__call__() takes and returns an iter-dict, takes and returns a iter-dict genetor, OR takes and returns a DataFrame.
(3) Implementers need only implement .transform() OR .transform_iter(). .__call__() redirects based the type of the input.  default implementation is to call .transform_iter() (and vice versa).
(4) Add an easy to remember kwarg for pt.apply to take iter-dicts (iter=True).

Co-authored-by: Sean MacAvaney
  • Loading branch information
cmacdonald authored Sep 18, 2024
1 parent e94b29c commit f8e47fb
Show file tree
Hide file tree
Showing 13 changed files with 764 additions and 255 deletions.
45 changes: 23 additions & 22 deletions docs/apply.rst
Original file line number Diff line number Diff line change
Expand Up @@ -23,28 +23,29 @@ in a PyTerrier pipeline.

Each apply method takes as input a function (e.g. a function name, or a lambda expression).
Objects that are passed to the function vary in terms of the type of the input dataframe
(queries or ranked documents), and also vary in terms of what should be returned by the
function.

+-------+---------+-------------+------------------+---------------------------+----------------------+-----------------------+
+ Input | Output | Cardinality | Example | Example apply | Function Input type | Function Return type |
+=======+=========+=============+==================+===========================+======================+=======================+
| Q | Q | 1 to 1 | Query rewriting | `pt.apply.query()` | row of one query | str |
+-------+---------+-------------+------------------+---------------------------+----------------------+-----------------------+
| Q x D | Q x D | 1 to 1 | Re-ranking | `pt.apply.doc_score()` | row of one document | float |
+-------+---------+-------------+------------------+---------------------------+----------------------+-----------------------+
| Q x D | Q x Df | 1 to 1 | Feature scoring | `pt.apply.doc_features()` | row of one document | numpy array |
+-------+---------+-------------+------------------+---------------------------+----------------------+-----------------------+
| Q x D | Q | N to 1 | Query expansion | `pt.apply.generic()` | entire dataframe | entire dataframe |
+ | | | +---------------------------+----------------------+-----------------------+
| | | | | `pt.apply.by_query()` | dataframe for 1 query| dataframe for 1 query |
+-------+---------+-------------+------------------+---------------------------+----------------------+-----------------------+
| Q | Q x D | 1 to N | Retrieval | `pt.apply.generic()` | entire dataframe | entire dataframe |
+ | | | +---------------------------+----------------------+-----------------------+
| | | | | `pt.apply.by_query()` | dataframe for 1 query| dataframe for 1 query |
+-------+---------+-------------+------------------+---------------------------+----------------------+-----------------------+
| D | None | N to 0 | Indexing | `pt.apply.indexer()` | iterable dictionary | anything |
+-------+---------+-------------+------------------+---------------------------+----------------------+-----------------------+
(queries or ranked documents), whether they represent one row (pd.Series or dictionary) or
many rows (pd.DataFrame or list of dictionaries), and also vary in terms of what should be
returned by the function.

+-------+---------+-------------+------------------+---------------------------+-------------------------------------+-------------------------------------------+
+ Input | Output | Cardinality | Example | Example apply | Function Input type | Function Return type |
+=======+=========+=============+==================+===========================+=====================================+===========================================+
| Q | Q | 1 to 1 | Query rewriting | `pt.apply.query()` | row of one query (pd.Series/dict) | str |
+-------+---------+-------------+------------------+---------------------------+-------------------------------------+-------------------------------------------+
| Q x D | Q x D | 1 to 1 | Re-ranking | `pt.apply.doc_score()` | row of one document (pd.Series/dict)| float |
+-------+---------+-------------+------------------+---------------------------+-------------------------------------+-------------------------------------------+
| Q x D | Q x Df | 1 to 1 | Feature scoring | `pt.apply.doc_features()` | row of one document (pd.Series/dict)| numpy array |
+-------+---------+-------------+------------------+---------------------------+-------------------------------------+-------------------------------------------+
| Q x D | Q | N to 1 | Query expansion | `pt.apply.generic()` | entire (dataframe/iter-dict) | entire dataframe / iterable of dict |
+ | | | +---------------------------+-------------------------------------+-------------------------------------------+
| | | | | `pt.apply.by_query()` | (dataframe/iter-dict) for 1 query | dataframe / iterable of dict for 1 query |
+-------+---------+-------------+------------------+---------------------------+-------------------------------------+-------------------------------------------+
| Q | Q x D | 1 to N | Retrieval | `pt.apply.generic()` | entire (dataframe/iter-dict) | entire dataframe |
+ | | | +---------------------------+-------------------------------------+-------------------------------------------+
| | | | | `pt.apply.by_query()` | (dataframe/iter-dict) for 1 query | dataframe / iterable of dict for 1 query |
+-------+---------+-------------+------------------+---------------------------+-------------------------------------+-------------------------------------------+
| D | None | N to 0 | Indexing | `pt.apply.indexer()` | iterable dictionary | anything |
+-------+---------+-------------+------------------+---------------------------+-------------------------------------+-------------------------------------------+

In each case, the result from calling a pyterrier.apply method is another PyTerrier transformer
(i.e. extends ``pt.Transformer``), which can be used for experimentation or combined with other
Expand Down
1 change: 1 addition & 0 deletions docs/transformer.rst
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ This class is the base class for all transformers.

.. autoclass:: pyterrier.Transformer
:members:
:special-members: __call__


Default Method
Expand Down
3 changes: 1 addition & 2 deletions pyterrier/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

from deprecated import deprecated

from pyterrier import utils
from pyterrier import model, utils
from pyterrier.transformer import Transformer, Estimator, Indexer

from pyterrier import java
Expand All @@ -14,7 +14,6 @@
from pyterrier import debug
from pyterrier import io
from pyterrier import measures
from pyterrier import model
from pyterrier import new
from pyterrier import ltr
from pyterrier import parallel
Expand Down
91 changes: 59 additions & 32 deletions pyterrier/apply.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
from functools import partial
from typing import Callable, Any, Dict, Union, Iterator, Sequence
from typing import Callable, Any, Dict, Union, Optional, Sequence
import numpy.typing as npt
import pandas as pd
import pyterrier as pt
from pyterrier.apply_base import ApplyDocumentScoringTransformer, ApplyQueryTransformer, ApplyDocFeatureTransformer, ApplyForEachQuery, ApplyGenericTransformer, ApplyIndexer
from pyterrier.apply_base import ApplyDocumentScoringTransformer, ApplyQueryTransformer, ApplyDocFeatureTransformer, ApplyForEachQuery, ApplyIterForEachQuery, ApplyGenericTransformer, ApplyGenericIterTransformer, ApplyIndexer, DropColumnTransformer, ApplyByRowTransformer

def _bind(instance, func, as_name=None):
"""
Expand All @@ -17,15 +17,17 @@ def _bind(instance, func, as_name=None):
setattr(instance, as_name, bound_method)
return bound_method

def query(fn : Callable[[pd.Series], str], *args, **kwargs) -> pt.Transformer:
def query(fn : Callable[[Union[pd.Series,pt.model.IterDictRecord]], str], *args, **kwargs) -> pt.Transformer:
"""
Create a transformer that takes as input a query, and applies a supplied function to compute a new query formulation.
The supplied function is called once for each query, and must return a string containing the new query formulation.
Each time it is called, the function is supplied with a Panda Series representing the attributes of the query.
Each time it is called, the function is supplied with a Panda Series or dict representing the attributes of the query.
The particular type of the input is not controlled by the implementor, so the function should be written to support
both, e.g. using ``row["key"]`` notation and not the ``row.key`` that is supported by a Series.
The previous query formulation is saved in the "query_0" column. If a later pipeline stage is intended to resort to
be executed on the previous query formulation, a `pt.rewrite.reset()` transformer can be applied.
be executed on the previous query formulation, a ``pt.rewrite.reset()`` transformer can be applied.
Arguments:
fn(Callable): the function to apply to each row. It must return a string containing the new query formulation.
Expand All @@ -51,15 +53,15 @@ def _remove_stops(q):
) >> pt.terrier.Retriever(index, wmodel="DPH")
In both of the example pipelines above (`p1` and `p2`), the exact topics are not known until the pipeline is invoked, e.g.
by using `p1.transform(topics)` on a topics dataframe, or within a `pt.Experiment()`. When the pipeline
by using `p1.transform(topics)` on a topics dataframe, or within a ``pt.Experiment()``. When the pipeline
is invoked, the specified function (`_remove_stops` in the case of `p1`) is called for **each** row of the
input datatrame (becoming the `q` function argument).
"""
return ApplyQueryTransformer(fn, *args, **kwargs)

def doc_score(fn : Union[Callable[[pd.Series], float], Callable[[pd.DataFrame], Sequence[float]]], *args, batch_size=None, **kwargs) -> pt.Transformer:
def doc_score(fn : Union[Callable[[Union[pd.Series,pt.model.IterDictRecord]], float], Callable[[pd.DataFrame], Sequence[float]]], *args, batch_size=None, **kwargs) -> pt.Transformer:
"""
Create a transformer that takes as input a ranked documents dataframe, and applies a supplied function to compute a new score.
Ranks are automatically computed. doc_score() can operate row-wise, or batch-wise, depending on whether batch_size is set.
Expand All @@ -76,7 +78,7 @@ def doc_score(fn : Union[Callable[[pd.Series], float], Callable[[pd.DataFrame],
# this transformer will subtract 5 from the score of each document
p = pt.terrier.Retriever(index, wmodel="DPH") >>
pt.apply.doc_score(lambda doc : doc["score"] -5)
pt.apply.doc_score(lambda doc : doc["score"] -5) # doc["score"] works for both a dict and Series
Can be used in batch-wise manner, which is particularly useful for appling neural models. In this case,
the scoring function receives a dataframe, rather than a single row::
Expand All @@ -94,12 +96,14 @@ def _doclen(df):
"""
return ApplyDocumentScoringTransformer(fn, *args, batch_size=batch_size, **kwargs)

def doc_features(fn : Callable[[pd.Series], npt.NDArray[Any]], *args, **kwargs) -> pt.Transformer:
def doc_features(fn : Callable[[Union[pd.Series,pt.model.IterDictRecord]], npt.NDArray[Any]], *args, **kwargs) -> pt.Transformer:
"""
Create a transformer that takes as input a ranked documents dataframe, and applies the supplied function to each document to compute feature scores.
The supplied function is called once for each document, must each time return a 1D numpy array.
Each time it is called, the function is supplied with a Panda Series representing the attributes of the query and document.
Each time it is called, the function is supplied with a Panda Series, or a dictionary, representing the attributes of the query and document. The
particular type of the input is not controlled by the implementor, so the function should be written to support both, e.g. using ``row["key"]``
notation and not the ``row.key`` that is supported by a Series.
Arguments:
fn(Callable): the function to apply to each row
Expand Down Expand Up @@ -128,7 +132,7 @@ def _features(row):
"""
return ApplyDocFeatureTransformer(fn, *args, **kwargs)

def indexer(fn : Callable[[Iterator[Dict[str,Any]]], Any], **kwargs) -> pt.Indexer:
def indexer(fn : Callable[[pt.model.IterDict], Any], **kwargs) -> pt.Indexer:
"""
Create an instance of pt.Indexer using a function that takes as input an interable dictionary.
Expand Down Expand Up @@ -164,7 +168,7 @@ def rename(columns : Dict[str,str], *args, errors='raise', **kwargs) -> pt.Trans
"""
return ApplyGenericTransformer(lambda df: df.rename(columns=columns, errors=errors), *args, **kwargs)

def generic(fn : Callable[[pd.DataFrame], pd.DataFrame], *args, batch_size=None, **kwargs) -> pt.Transformer:
def generic(fn : Union[Callable[[pd.DataFrame], pd.DataFrame], Callable[[pt.model.IterDict], pt.model.IterDict]], *args, batch_size=None, iter=False, **kwargs) -> pt.Transformer:
"""
Create a transformer that changes the input dataframe to another dataframe in an unspecified way.
Expand All @@ -175,24 +179,49 @@ def generic(fn : Callable[[pd.DataFrame], pd.DataFrame], *args, batch_size=None,
Arguments:
fn(Callable): the function to apply to each row
batch_size(int or None): whether to apply fn on batches of rows or all that are received
verbose(bool): Whether to display a progress bar over batches (only used if batch_size is set).
Example::
verbose(bool): Whether to display a progress bar over batches (only used if batch_size is set, and iter is not set).
iter(bool): Whether to use the iter-dict API - if-so, then ``fn`` receives an iterable, and returns an iterable.
# this transformer will remove all documents at rank greater than 2.
Example (dataframe)::
# this pipeline would remove all but the first two documents from a result set
pipe = pt.terrier.Retriever(index) >> pt.apply.generic(lambda res : res[res["rank"] < 2])
Example (iter-dict)::
# this pipeline would simlarly remove all but the first two documents from a result set
def _fn(iterdict):
for result in iterdict:
if result["rank"] < 2:
yield result
pipe1 = pt.terrier.Retriever(index) >> pt.apply.generic(_fn, iter=True)
# transform_iter() can also return an iterable, so returning a list is also permissible
pipe2 = pt.terrier.Retriever(index) >> pt.apply.generic(lambda res: [row for row in res if row["rank"] < 2], iter=True)
"""
if iter:
if kwargs.get("add_ranks", False):
raise ValueError("add_ranks=True not supported with iter=True")
return ApplyGenericIterTransformer(fn, *args, batch_size=batch_size, **kwargs)
return ApplyGenericTransformer(fn, *args, batch_size=batch_size, **kwargs)

def by_query(fn : Callable[[pd.DataFrame], pd.DataFrame], *args, batch_size=None, **kwargs) -> pt.Transformer:
def by_query(fn : Union[Callable[[pd.DataFrame], pd.DataFrame], Callable[[pt.model.IterDict], pt.model.IterDict]], *args, batch_size=None, iter=False, **kwargs) -> pt.Transformer:
"""
As `pt.apply.generic()` except that fn receives a dataframe for one query at at time, rather than all results at once.
As `pt.apply.generic()` except that fn receives a dataframe (or iter-dict) for one query at at time, rather than all results at once.
If batch_size is set, fn will receive no more than batch_size documents for any query. The verbose kwargs controls whether
to display a progress bar over queries.
"""
Arguments:
fn(Callable): the function to apply to each row. Should return a generator
batch_size(int or None): whether to apply fn on batches of rows or all that are received.
verbose(bool): Whether to display a progress bar over batches (only used if batch_size is set, and iter is not set).
iter(bool): Whether to use the iter-dict API - if-so, then ``fn`` receives an iterable, and must return an iterable.
"""
if iter:
if kwargs.get("add_ranks", False):
raise ValueError("add_ranks=True not supported with iter=True")
return ApplyIterForEachQuery(fn, *args, batch_size=batch_size, **kwargs)
return ApplyForEachQuery(fn, *args, batch_size=batch_size, **kwargs)

class _apply:
Expand All @@ -209,18 +238,16 @@ def __init__(self):
def __getattr__(self, item):
return partial(generic_apply, item)

def generic_apply(name, *args, drop=False, **kwargs) -> pt.Transformer:
def generic_apply(
name: str,
fn=None,
*,
drop: bool = False,
batch_size: Optional[int] = None,
verbose=False
) -> pt.Transformer:
if drop:
return ApplyGenericTransformer(lambda df : df.drop(name, axis=1), *args, **kwargs)

if len(args) == 0:
raise ValueError("Must specify a fn, e.g. a lambda")

fn = args[0]
args=[]
assert fn is None, "cannot provide both fn and drop=True"
return DropColumnTransformer(name)

def _new_column(df):
df[name] = df.apply(fn, axis=1, result_type='reduce')
return df

return ApplyGenericTransformer(_new_column, *args, **kwargs)
return ApplyByRowTransformer(name, fn, batch_size=batch_size, verbose=verbose)
Loading

0 comments on commit f8e47fb

Please sign in to comment.