Skip to content

Commit

Permalink
Merge pull request #155 from twitter/jbaxter/2023_09_21_scorer
Browse files Browse the repository at this point in the history
Scorer update: stable initialization, incorrect tag update, refactoring
  • Loading branch information
jbaxter authored Sep 21, 2023
2 parents d5f23ba + bf1d6fa commit eab56ba
Show file tree
Hide file tree
Showing 13 changed files with 301 additions and 154 deletions.
112 changes: 1 addition & 111 deletions sourcecode/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,117 +10,7 @@
--outdir data
"""

import argparse
import os

import scoring.constants as c
from scoring.enums import scorers_from_csv
from scoring.process_data import LocalDataLoader, write_tsv_local
from scoring.run_scoring import run_scoring


def parse_args():
parser = argparse.ArgumentParser("Community Notes Scoring")
parser.add_argument(
"-e", "--enrollment", default=c.enrollmentInputPath, help="note enrollment dataset"
)
parser.add_argument(
"--epoch-millis",
default=None,
type=float,
dest="epoch_millis",
help="timestamp in milliseconds since epoch to treat as now",
)
parser.add_argument(
"--headers",
dest="headers",
help="First row of input files should be a header",
action="store_true",
)
parser.add_argument(
"--noheaders",
dest="headers",
help="First row of input files should be data. There should be no headers.",
action="store_false",
)
parser.set_defaults(headers=True)
parser.add_argument("-n", "--notes", default=c.notesInputPath, help="note dataset")
parser.add_argument("-o", "--outdir", default=".", help="directory for output files")
parser.add_argument(
"--pseudoraters",
dest="pseudoraters",
help="Include calculation of pseudorater intervals",
action="store_true",
)
parser.add_argument(
"--nopseudoraters",
dest="pseudoraters",
help="Exclude calculation of pseudorater intervals (faster)",
action="store_false",
)
parser.set_defaults(pseudoraters=True)
parser.add_argument("-r", "--ratings", default=c.ratingsInputPath, help="rating dataset")
parser.add_argument(
"--scorers", default=None, type=scorers_from_csv, help="CSV list of scorers to enable."
)
parser.add_argument(
"--seed", default=None, type=int, help="set to an int to seed matrix factorization"
)
parser.add_argument(
"-s", "--status", default=c.noteStatusHistoryInputPath, help="note status history dataset"
)
parser.add_argument(
"--strict-columns",
dest="strict_columns",
help="Explicitly select columns and require that expected columns are present.",
action="store_true",
)
parser.add_argument(
"--nostrict-columns",
help="Disable validation of expected columns and allow unexpected columns.",
action="store_false",
dest="strict_columns",
)
parser.set_defaults(strict_columns=True)
parser.add_argument(
"--parallel",
help="Disable parallel run of algorithm.",
action="store_true",
dest="parallel",
)
parser.set_defaults(parallel=False)

return parser.parse_args()


def main():
# Parse arguments and fix timestamp, if applicable.
args = parse_args()
if args.epoch_millis:
c.epochMillis = args.epoch_millis

# Load input dataframes.
dataLoader = LocalDataLoader(args.notes, args.ratings, args.status, args.enrollment, args.headers)
_, ratings, statusHistory, userEnrollment = dataLoader.get_data()

# Invoke scoring and user contribution algorithms.
scoredNotes, helpfulnessScores, newStatus, auxNoteInfo = run_scoring(
ratings,
statusHistory,
userEnrollment,
seed=args.seed,
pseudoraters=args.pseudoraters,
enabledScorers=args.scorers,
strictColumns=args.strict_columns,
runParallel=args.parallel,
dataLoader=dataLoader if args.parallel == True else None,
)

# Write outputs to local disk.
write_tsv_local(scoredNotes, os.path.join(args.outdir, "scored_notes.tsv"))
write_tsv_local(helpfulnessScores, os.path.join(args.outdir, "helpfulness_scores.tsv"))
write_tsv_local(newStatus, os.path.join(args.outdir, "note_status_history.tsv"))
write_tsv_local(auxNoteInfo, os.path.join(args.outdir, "aux_note_info.tsv"))
from scoring.runner import main


if __name__ == "__main__":
Expand Down
12 changes: 12 additions & 0 deletions sourcecode/scoring/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -310,6 +310,11 @@ def rater_factor_key(i):
timestampMillisOfStatusLockKey = "timestampMillisOfStatusLock"
lockedStatusKey = "lockedStatus"
timestampMillisOfRetroLockKey = "timestampMillisOfRetroLock"
currentCoreStatusKey = "currentCoreStatus"
currentExpansionStatusKey = "currentExpansionStatus"
currentGroupStatusKey = "currentGroupStatus"
currentDecidedByKey = "currentDecidedBy"
currentModelingGroupKey = "currentModelingGroup"

noteStatusHistoryTSVColumnsAndTypes = [
(noteIdKey, np.int64),
Expand All @@ -324,13 +329,19 @@ def rater_factor_key(i):
(timestampMillisOfStatusLockKey, np.double), # double because nullable.
(lockedStatusKey, object),
(timestampMillisOfRetroLockKey, np.double), # double because nullable.
(currentCoreStatusKey, object),
(currentExpansionStatusKey, object),
(currentGroupStatusKey, object),
(currentDecidedByKey, object),
(currentModelingGroupKey, object),
]
noteStatusHistoryTSVColumns = [col for (col, dtype) in noteStatusHistoryTSVColumnsAndTypes]
noteStatusHistoryTSVTypes = [dtype for (col, dtype) in noteStatusHistoryTSVColumnsAndTypes]
noteStatusHistoryTSVTypeMapping = {
col: dtype for (col, dtype) in noteStatusHistoryTSVColumnsAndTypes
}


# Earn In + Earn Out
enrollmentState = "enrollmentState"
successfulRatingNeededToEarnIn = "successfulRatingNeededToEarnIn"
Expand Down Expand Up @@ -487,6 +498,7 @@ def rater_factor_key(i):
(groupNoteInterceptMinKey, np.double),
(modelingGroupKey, np.float64),
(numRatingsKey, np.int64),
(timestampMillisOfNoteCurrentLabelKey, np.double),
]
noteModelOutputTSVColumns = [col for (col, dtype) in noteModelOutputTSVColumnsAndTypes]
noteModelOutputTSVTypeMapping = {col: dtype for (col, dtype) in noteModelOutputTSVColumnsAndTypes}
Expand Down
12 changes: 12 additions & 0 deletions sourcecode/scoring/matrix_factorization/matrix_factorization.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ def _initialize_parameters(
globalInterceptInit: Optional[float] = None,
) -> None:
"""Overwrite the parameters of the model with the given initializations.
Set parameters to 0.0 if they were not set in the passed initializations.
Args:
mf_model (BiasedMatrixFactorization)
Expand All @@ -135,9 +136,14 @@ def _initialize_parameters(
if self._logging:
print("initializing notes")
noteInit = self.noteIdMap.merge(noteInit, on=c.noteIdKey, how="left")

noteInit[c.internalNoteInterceptKey].fillna(0.0, inplace=True)
self.mf_model.note_intercepts.weight.data = torch.tensor(
np.expand_dims(noteInit[c.internalNoteInterceptKey].astype(np.float32).values, axis=1)
)

for i in range(1, self._numFactors + 1):
noteInit[c.note_factor_key(i)].fillna(0.0, inplace=True)
self.mf_model.note_factors.weight.data = torch.tensor(
noteInit[[c.note_factor_key(i) for i in range(1, self._numFactors + 1)]]
.astype(np.float32)
Expand All @@ -148,9 +154,14 @@ def _initialize_parameters(
if self._logging:
print("initializing users")
userInit = self.raterIdMap.merge(userInit, on=c.raterParticipantIdKey, how="left")

userInit[c.internalRaterInterceptKey].fillna(0.0, inplace=True)
self.mf_model.user_intercepts.weight.data = torch.tensor(
np.expand_dims(userInit[c.internalRaterInterceptKey].astype(np.float32).values, axis=1)
)

for i in range(1, self._numFactors + 1):
userInit[c.rater_factor_key(i)].fillna(0.0, inplace=True)
self.mf_model.user_factors.weight.data = torch.tensor(
userInit[[c.rater_factor_key(i) for i in range(1, self._numFactors + 1)]]
.astype(np.float32)
Expand Down Expand Up @@ -412,6 +423,7 @@ def run_mf(
globalIntercept: learned global intercept parameter
"""
self._initialize_note_and_rater_id_maps(ratings)

self._create_mf_model(noteInit, userInit, globalInterceptInit)
assert self.mf_model is not None

Expand Down
73 changes: 70 additions & 3 deletions sourcecode/scoring/mf_base_scorer.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ def __init__(
crhSuperThreshold: float = 0.5,
inertiaDelta: float = 0.01,
weightedTotalVotes: float = 2.5,
useStableInitialization: bool = True,
):
"""Configure MatrixFactorizationScorer object.
Expand Down Expand Up @@ -69,6 +70,8 @@ def __init__(
repeated reason tags in not-helpful ratings to achieve CRH status.
inertiaDelta: Minimum amount which a note that has achieve CRH status must drop below the
applicable threshold to lose CRH status.
weightedTotalVotes: Minimum number of weighted incorrect votes required to lose CRH status.
useStableInitialization: whether to use a specific modeling group of users to stably initialize
"""
super().__init__(seed)
self._pseudoraters = pseudoraters
Expand All @@ -87,6 +90,7 @@ def __init__(
self._crhSuperThreshold = crhSuperThreshold
self._inertiaDelta = inertiaDelta
self._weightedTotalVotes = weightedTotalVotes
self._modelingGroupToInitializeForStability = 13 if useStableInitialization else None
self._mfRanker = MatrixFactorization()

def get_crh_threshold(self) -> float:
Expand Down Expand Up @@ -155,8 +159,70 @@ def _prepare_data_for_scoring(self, ratings: pd.DataFrame) -> pd.DataFrame:
ratings, self._minNumRatingsPerRater, self._minNumRatersPerNote
)

def _run_regular_matrix_factorization(self, ratingsForTraining: pd.DataFrame):
"""Train a matrix factorization model on the ratingsForTraining data.
Args:
ratingsForTraining (pd.DataFrame)
Returns:
noteParams (pd.DataFrame)
raterParams (pd.DataFrame)
globalIntercept (float)
"""
return self._mfRanker.run_mf(ratingsForTraining)

def _run_stable_matrix_factorization(
self,
ratingsForTraining: pd.DataFrame,
userEnrollmentRaw: pd.DataFrame,
):
"""Train a matrix factorization model on the ratingsForTraining data.
Due to stability issues when trained on the entire dataset with no initialization, this is done in
two steps:
1. Train a model on the subset of the data with modeling group 13 (stable initialization).
2. Train a model on the entire dataset, initializing with the results from step 1.
Without this initialization, the factors for some subsets of the data with low crossover between raters can
flip relative to each other from run to run.
Args:
ratingsForTraining (pd.DataFrame)
userEnrollmentRaw (pd.DataFrame)
Returns:
noteParams (pd.DataFrame)
raterParams (pd.DataFrame)
globalIntercept (float)
"""
if self._modelingGroupToInitializeForStability is None:
return self._run_regular_matrix_factorization(ratingsForTraining)

ratingsForTrainingWithModelingGroup = ratingsForTraining.merge(
userEnrollmentRaw[[c.participantIdKey, c.modelingGroupKey]],
left_on=c.raterParticipantIdKey,
right_on=c.participantIdKey,
)
ratingsForStableInitialization = ratingsForTrainingWithModelingGroup[
ratingsForTrainingWithModelingGroup[c.modelingGroupKey]
== self._modelingGroupToInitializeForStability
]
assert (
len(ratingsForStableInitialization) > 0
), "No ratings from stable initialization modeling group."
initializationMF = self._mfRanker.get_new_mf_with_same_args()
noteParamsInit, raterParamsInit, globalInterceptInit = initializationMF.run_mf(
ratingsForStableInitialization
)

return self._mfRanker.run_mf(
ratingsForTraining,
noteInit=noteParamsInit,
userInit=raterParamsInit,
globalInterceptInit=globalInterceptInit,
)

def _score_notes_and_users(
self, ratings: pd.DataFrame, noteStatusHistory: pd.DataFrame
self, ratings: pd.DataFrame, noteStatusHistory: pd.DataFrame, userEnrollmentRaw: pd.DataFrame
) -> Tuple[pd.DataFrame, pd.DataFrame]:
"""Run the matrix factorization scoring algorithm.
Expand All @@ -167,6 +233,7 @@ def _score_notes_and_users(
Args:
ratings (pd.DataFrame): preprocessed ratings
noteStatusHistory (pd.DataFrame): one row per note; history of when note had each status
userEnrollmentRaw (pd.DataFrame): one row per user; enrollment status for each user
Returns:
Tuple[pd.DataFrame, pd.DataFrame]:
Expand All @@ -182,8 +249,8 @@ def _score_notes_and_users(
ratingsForTraining = self._prepare_data_for_scoring(ratings)

# TODO: Save parameters from this first run in note_model_output next time we add extra fields to model output TSV.
noteParamsUnfiltered, raterParamsUnfiltered, globalBias = self._mfRanker.run_mf(
ratingsForTraining,
noteParamsUnfiltered, raterParamsUnfiltered, globalBias = self._run_stable_matrix_factorization(
ratingsForTraining, userEnrollmentRaw
)

# Get a dataframe of scored notes based on the algorithm results above
Expand Down
3 changes: 2 additions & 1 deletion sourcecode/scoring/mf_core_scorer.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ def __init__(
seed: Optional[int] = None,
pseudoraters: Optional[bool] = False,
core_threshold: float = 0.5,
useStableInitialization: bool = True,
) -> None:
"""Configure MFCoreScorer object.
Expand All @@ -27,7 +28,7 @@ def __init__(
core_threshold: float specifying the fraction of reviews which must be from CORE users
for a note to be in scope for the CORE model.
"""
super().__init__(seed, pseudoraters)
super().__init__(seed, pseudoraters, useStableInitialization=useStableInitialization)
self._core_threshold = core_threshold

def _get_note_col_mapping(self) -> Dict[str, str]:
Expand Down
7 changes: 2 additions & 5 deletions sourcecode/scoring/mf_expansion_scorer.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,13 @@


class MFExpansionScorer(MFBaseScorer):
def __init__(
self,
seed: Optional[int] = None,
) -> None:
def __init__(self, seed: Optional[int] = None, useStableInitialization: bool = True) -> None:
"""Configure MFExpansionScorer object.
Args:
seed: if not None, seed value to ensure deterministic execution
"""
super().__init__(seed, pseudoraters=False)
super().__init__(seed, pseudoraters=False, useStableInitialization=useStableInitialization)

def _get_note_col_mapping(self) -> Dict[str, str]:
"""Returns a dict mapping default note column names to custom names for a specific model."""
Expand Down
4 changes: 2 additions & 2 deletions sourcecode/scoring/mf_group_scorer.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@


# Number of MFGroupScorer objects we expect to instantiate
groupScorerCount = 12
groupScorerCount = 13


def _coalesce_columns(df: pd.DataFrame, columnPrefix: str) -> pd.DataFrame:
Expand Down Expand Up @@ -108,7 +108,7 @@ def __init__(
groupThreshold: float indicating what fraction of ratings must be from within a group
for the model to be active
"""
super().__init__(seed, pseudoraters)
super().__init__(seed, pseudoraters, useStableInitialization=False)
assert groupNumber > 0, "groupNumber must be positive. 0 is reserved for unassigned."
assert groupNumber <= groupScorerCount, "groupNumber exceeds maximum expected groups."
self._groupNumber = groupNumber
Expand Down
Loading

0 comments on commit eab56ba

Please sign in to comment.