From db8c4cde80735f8960cd91f56279ed3c604c8500 Mon Sep 17 00:00:00 2001 From: Jay Baxter Date: Mon, 28 Aug 2023 09:49:25 -0700 Subject: [PATCH] Multiple scorer updates --- sourcecode/main.py | 18 +++- sourcecode/scoring/constants.py | 2 +- .../matrix_factorization.py | 13 ++- .../scoring/matrix_factorization/model.py | 6 +- .../matrix_factorization/pseudo_raters.py | 79 ++++++++------ sourcecode/scoring/mf_base_scorer.py | 2 +- sourcecode/scoring/process_data.py | 101 ++++++++++++------ sourcecode/scoring/run_scoring.py | 39 +++++-- 8 files changed, 177 insertions(+), 83 deletions(-) diff --git a/sourcecode/main.py b/sourcecode/main.py index fcf3bd75..1542a863 100644 --- a/sourcecode/main.py +++ b/sourcecode/main.py @@ -7,7 +7,7 @@ --notes data/notes-00000.tsv \ --ratings data/ratings-00000.tsv \ --status data/noteStatusHistory-00000.tsv \ - --outdir data/scored_notes.tsv + --outdir data """ import argparse @@ -15,7 +15,7 @@ import scoring.constants as c from scoring.enums import scorers_from_csv -from scoring.process_data import get_data, write_tsv_local +from scoring.process_data import LocalDataLoader, write_tsv_local from scoring.run_scoring import run_scoring @@ -82,6 +82,13 @@ def parse_args(): dest="strict_columns", ) parser.set_defaults(strict_columns=True) + parser.add_argument( + "--parallel", + help="Disable parallel run of algorithm.", + action="store_true", + dest="parallel", + ) + parser.set_defaults(parallel=False) return parser.parse_args() @@ -93,9 +100,8 @@ def main(): c.epochMillis = args.epoch_millis # Load input dataframes. - _, ratings, statusHistory, userEnrollment = get_data( - args.notes, args.ratings, args.status, args.enrollment, args.headers - ) + dataLoader = LocalDataLoader(args.notes, args.ratings, args.status, args.enrollment, args.headers) + _, ratings, statusHistory, userEnrollment = dataLoader.get_data() # Invoke scoring and user contribution algorithms. scoredNotes, helpfulnessScores, newStatus, auxNoteInfo = run_scoring( @@ -106,6 +112,8 @@ def main(): pseudoraters=args.pseudoraters, enabledScorers=args.scorers, strictColumns=args.strict_columns, + runParallel=args.parallel, + dataLoader=dataLoader if args.parallel == True else None, ) # Write outputs to local disk. diff --git a/sourcecode/scoring/constants.py b/sourcecode/scoring/constants.py index a0975a19..6c7e91da 100644 --- a/sourcecode/scoring/constants.py +++ b/sourcecode/scoring/constants.py @@ -16,7 +16,7 @@ maxTrainError = 0.09 -coreFlipPct = 0.175 +coreFlipPct = 0.09 expansionFlipPct = 0.19 maxReruns = 5 diff --git a/sourcecode/scoring/matrix_factorization/matrix_factorization.py b/sourcecode/scoring/matrix_factorization/matrix_factorization.py index 7a4581a4..b0d6b70b 100644 --- a/sourcecode/scoring/matrix_factorization/matrix_factorization.py +++ b/sourcecode/scoring/matrix_factorization/matrix_factorization.py @@ -132,7 +132,8 @@ def _initialize_parameters( """ assert self.mf_model is not None if noteInit is not None: - print("initializing notes") + if self._logging: + print("initializing notes") noteInit = self.noteIdMap.merge(noteInit, on=c.noteIdKey, how="left") self.mf_model.note_intercepts.weight.data = torch.tensor( np.expand_dims(noteInit[c.internalNoteInterceptKey].astype(np.float32).values, axis=1) @@ -144,7 +145,8 @@ def _initialize_parameters( ) if userInit is not None: - print("initializing users") + if self._logging: + print("initializing users") userInit = self.raterIdMap.merge(userInit, on=c.raterParticipantIdKey, how="left") self.mf_model.user_intercepts.weight.data = torch.tensor( np.expand_dims(userInit[c.internalRaterInterceptKey].astype(np.float32).values, axis=1) @@ -156,7 +158,8 @@ def _initialize_parameters( ) if globalInterceptInit is not None: - print("initialized global intercept") + if self._logging: + print("initialized global intercept") self.mf_model.global_intercept = torch.nn.parameter.Parameter( torch.ones(1, 1) * globalInterceptInit ) @@ -214,7 +217,8 @@ def _create_mf_model( ) # smaller learning rate else: self.optimizer = torch.optim.Adam(self.mf_model.parameters(), lr=self._noInitLearningRate) - print(self.mf_model.device) + if self._logging: + print(self.mf_model.device) self.mf_model.to(self.mf_model.device) def _instantiate_biased_mf_model(self): @@ -225,6 +229,7 @@ def _instantiate_biased_mf_model(self): n_notes, use_global_intercept=self._useGlobalIntercept, n_factors=self._numFactors, + logging=self._logging, ) if self._logging: print("------------------") diff --git a/sourcecode/scoring/matrix_factorization/model.py b/sourcecode/scoring/matrix_factorization/model.py index 38917949..49632453 100644 --- a/sourcecode/scoring/matrix_factorization/model.py +++ b/sourcecode/scoring/matrix_factorization/model.py @@ -20,6 +20,7 @@ def __init__( n_notes: int, n_factors: int = 1, use_global_intercept: bool = True, + logging: bool = True, ) -> None: """Initialize matrix factorization model using xavier_uniform for factors and zeros for intercepts. @@ -32,6 +33,8 @@ def __init__( """ super().__init__() + self._logging = logging + self.user_factors = torch.nn.Embedding(n_users, n_factors, sparse=False) self.note_factors = torch.nn.Embedding(n_notes, n_factors, sparse=False) @@ -78,5 +81,6 @@ def _freeze_parameters(self, words_to_freeze: set): for name, param in self.named_parameters(): for word in words_to_freeze: if word in name: - print("Freezing parameter: ", name) + if self._logging: + print("Freezing parameter: ", name) param.requires_grad_(False) diff --git a/sourcecode/scoring/matrix_factorization/pseudo_raters.py b/sourcecode/scoring/matrix_factorization/pseudo_raters.py index 630d2bf4..823676f9 100644 --- a/sourcecode/scoring/matrix_factorization/pseudo_raters.py +++ b/sourcecode/scoring/matrix_factorization/pseudo_raters.py @@ -28,9 +28,11 @@ def __init__( globalBias: float, mfRanker: MatrixFactorization, logging=True, + checkParamsSame=True, ): self._logging = logging self._mfRanker = mfRanker + self._checkParamsSame = checkParamsSame self.ratings = ratings ( self.noteIdMap, @@ -43,9 +45,12 @@ def __init__( def compute_note_parameter_confidence_bounds_with_pseudo_raters(self): self._make_extreme_raters(self.raterParams, self.raterIdMap) + self._add_extreme_raters_to_id_maps_and_params() + self._create_extreme_ratings() - notesWithConfidenceBounds = self._fit_note_params_for_each_dataset_with_extreme_ratings() + noteParamsList = self._fit_note_params_for_each_dataset_with_extreme_ratings() + notesWithConfidenceBounds = self._aggregate_note_params(noteParamsList) return self.noteParams.merge( notesWithConfidenceBounds.reset_index(), on=c.noteIdKey, how="left" ) @@ -170,8 +175,9 @@ def _create_new_model_with_extreme_raters_from_original_params( lr=newExtremeMF._initLearningRate, ) - self._check_note_parameters_same(newExtremeMF) - self._check_rater_parameters_same(newExtremeMF) + if self._checkParamsSame: + self._check_note_parameters_same(newExtremeMF) + self._check_rater_parameters_same(newExtremeMF) return newExtremeMF @@ -183,15 +189,15 @@ def _fit_all_notes_with_raters_constant(self, ratingFeaturesAndLabelsWithExtreme newExtremeMF._fit_model() # Double check that we kept rater parameters fixed during re-training of note parameters. - self._check_rater_parameters_same(newExtremeMF) + if self._checkParamsSame: + self._check_rater_parameters_same(newExtremeMF) fitNoteParams, fitRaterParams = newExtremeMF._get_parameters_from_trained_model() return fitNoteParams - def _fit_note_params_for_each_dataset_with_extreme_ratings(self, joinOrig=False): - self._add_extreme_raters_to_id_maps_and_params() - extremeRatingsToAddWithoutNotes = [] - extremeRatingsToAddWithoutNotes.append( + def _create_extreme_ratings(self): + self.extremeRatingsToAddWithoutNotes = [] + self.extremeRatingsToAddWithoutNotes.append( { c.internalRaterInterceptKey: None, c.internalRaterFactor1Key: None, @@ -203,37 +209,44 @@ def _fit_note_params_for_each_dataset_with_extreme_ratings(self, joinOrig=False) for helpfulNum in (0.0, 1.0): extremeRater[c.helpfulNumKey] = helpfulNum - extremeRatingsToAddWithoutNotes.append(extremeRater.copy()) + self.extremeRatingsToAddWithoutNotes.append(extremeRater.copy()) + + def _create_dataset_with_extreme_rating_on_each_note(self, ratingToAddWithoutNoteId): + ## for each rating (ided by raterParticipantId and raterIndex) + if ratingToAddWithoutNoteId[c.helpfulNumKey] is not None: + ratingsWithNoteIds = [] + for i, noteRow in ( + self.ratingFeaturesAndLabels[[c.noteIdKey, mf_c.noteIndexKey]].drop_duplicates().iterrows() + ): + ratingToAdd = ratingToAddWithoutNoteId.copy() + ratingToAdd[c.noteIdKey] = noteRow[c.noteIdKey] + ratingToAdd[mf_c.noteIndexKey] = noteRow[mf_c.noteIndexKey] + ratingsWithNoteIds.append(ratingToAdd) + extremeRatingsToAdd = pd.DataFrame(ratingsWithNoteIds).drop( + [c.internalRaterInterceptKey, c.internalRaterFactor1Key], axis=1 + ) + ratingFeaturesAndLabelsWithExtremeRatings = pd.concat( + [self.ratingFeaturesAndLabels, extremeRatingsToAdd] + ) + else: + ratingFeaturesAndLabelsWithExtremeRatings = self.ratingFeaturesAndLabels + return ratingFeaturesAndLabelsWithExtremeRatings + def _fit_note_params_for_each_dataset_with_extreme_ratings(self): noteParamsList = [] - for ratingToAddWithoutNoteId in extremeRatingsToAddWithoutNotes: - ## for each rating (ided by raterParticipantId and raterIndex) - if ratingToAddWithoutNoteId[c.helpfulNumKey] is not None: - ratingsWithNoteIds = [] - for i, noteRow in ( - self.ratingFeaturesAndLabels[[c.noteIdKey, mf_c.noteIndexKey]] - .drop_duplicates() - .iterrows() - ): - ratingToAdd = ratingToAddWithoutNoteId.copy() - ratingToAdd[c.noteIdKey] = noteRow[c.noteIdKey] - ratingToAdd[mf_c.noteIndexKey] = noteRow[mf_c.noteIndexKey] - ratingsWithNoteIds.append(ratingToAdd) - extremeRatingsToAdd = pd.DataFrame(ratingsWithNoteIds).drop( - [c.internalRaterInterceptKey, c.internalRaterFactor1Key], axis=1 - ) - ratingFeaturesAndLabelsWithExtremeRatings = pd.concat( - [self.ratingFeaturesAndLabels, extremeRatingsToAdd] - ) - else: - ratingFeaturesAndLabelsWithExtremeRatings = self.ratingFeaturesAndLabels + for ratingToAddWithoutNoteId in self.extremeRatingsToAddWithoutNotes: + ratingFeaturesAndLabelsWithExtremeRatings = ( + self._create_dataset_with_extreme_rating_on_each_note(ratingToAddWithoutNoteId) + ) if self._logging: print("------------------") print(f"Re-scoring all notes with extra rating added: {ratingToAddWithoutNoteId}") + fitNoteParams = self._fit_all_notes_with_raters_constant( ratingFeaturesAndLabelsWithExtremeRatings ) + fitNoteParams[Constants.extraRaterInterceptKey] = ratingToAddWithoutNoteId[ c.internalRaterInterceptKey ] @@ -242,7 +255,9 @@ def _fit_note_params_for_each_dataset_with_extreme_ratings(self, joinOrig=False) ] fitNoteParams[Constants.extraRatingHelpfulNumKey] = ratingToAddWithoutNoteId[c.helpfulNumKey] noteParamsList.append(fitNoteParams) + return noteParamsList + def _aggregate_note_params(self, noteParamsList, joinOrig=False): rawRescoredNotesWithEachExtraRater = pd.concat(noteParamsList) rawRescoredNotesWithEachExtraRater.drop(mf_c.noteIndexKey, axis=1, inplace=True) rawRescoredNotesWithEachExtraRater = rawRescoredNotesWithEachExtraRater.sort_values( @@ -278,9 +293,9 @@ def _fit_note_params_for_each_dataset_with_extreme_ratings(self, joinOrig=False) raterFacs[Constants.allKey] = 1 raterFacs[Constants.negFacKey] = raterFacs[c.internalRaterFactor1Key] < 0 raterFacs[Constants.posFacKey] = raterFacs[c.internalRaterFactor1Key] > 0 - r = raterFacs.groupby(c.noteIdKey).sum()[ + r = raterFacs.groupby(c.noteIdKey)[ [Constants.allKey, Constants.negFacKey, Constants.posFacKey] - ] + ].sum() r.columns = pd.MultiIndex.from_product([[c.ratingCountKey], r.columns]) notesWithConfidenceBounds = notesWithConfidenceBounds.join(r) diff --git a/sourcecode/scoring/mf_base_scorer.py b/sourcecode/scoring/mf_base_scorer.py index 40c39d9a..da776174 100644 --- a/sourcecode/scoring/mf_base_scorer.py +++ b/sourcecode/scoring/mf_base_scorer.py @@ -32,7 +32,7 @@ def __init__( crnhThresholdNoteFactorMultiplier: float = -0.8, crnhThresholdNMIntercept: float = -0.15, crnhThresholdUCBIntercept: float = -0.04, - crhThresholdLCBIntercept: float = 0.32, + crhThresholdLCBIntercept: float = 0.35, crhSuperThreshold: float = 0.5, inertiaDelta: float = 0.01, weightedTotalVotes: float = 1.0, diff --git a/sourcecode/scoring/process_data.py b/sourcecode/scoring/process_data.py index 61ffabba..4fa36af0 100644 --- a/sourcecode/scoring/process_data.py +++ b/sourcecode/scoring/process_data.py @@ -1,3 +1,4 @@ +from abc import ABC, abstractmethod from io import StringIO from typing import Dict, List, Optional, Tuple @@ -8,38 +9,6 @@ import pandas as pd -def get_data( - notesPath: str, - ratingsPath: str, - noteStatusHistoryPath: str, - userEnrollmentPath: str, - headers: bool, - shouldFilterNotMisleadingNotes: bool = True, - logging: bool = True, -) -> Tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame, pd.DataFrame]: - """All-in-one function for reading Birdwatch notes and ratings from TSV files. - It does both reading and pre-processing. - - Args: - notesPath (str): file path - ratingsPath (str): file path - noteStatusHistoryPath (str): file path - headers: If true, expect first row of input files to be headers. - shouldFilterNotMisleadingNotes (bool, optional): Throw out not-misleading notes if True. Defaults to True. - logging (bool, optional): Print out debug output. Defaults to True. - - Returns: - Tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame, pd.DataFrame]: notes, ratings, noteStatusHistory, userEnrollment - """ - notes, ratings, noteStatusHistory, userEnrollment = read_from_tsv( - notesPath, ratingsPath, noteStatusHistoryPath, userEnrollmentPath, headers - ) - notes, ratings, noteStatusHistory = preprocess_data( - notes, ratings, noteStatusHistory, shouldFilterNotMisleadingNotes, logging - ) - return notes, ratings, noteStatusHistory, userEnrollment - - def read_from_strings( notesStr: str, ratingsStr: str, noteStatusHistoryStr: str ) -> Tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame]: @@ -514,3 +483,71 @@ def write_tsv_local(df: pd.DataFrame, path: str) -> None: assert path is not None assert df.to_csv(path, index=False, header=True, sep="\t") is None + + +class CommunityNotesDataLoader(ABC): + """Base class which local and prod data loaders extend. + + The DataLoader base class stores necessary files and defines "get_data" function which can be passed to + parallel scoring + """ + + def __init__(self) -> None: + """Configure a new CommunityNotesDataLoader object. + + Args: + local (bool, optional): if not None, seed value to ensure deterministic execution + """ + + @abstractmethod + def get_data(self) -> Tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame, pd.DataFrame]: + """Returns notes, ratings, noteStatusHistory, and userEnrollment DataFrames""" + + +class LocalDataLoader(CommunityNotesDataLoader): + def __init__( + self, + notesPath: str, + ratingsPath: str, + noteStatusHistoryPath: str, + userEnrollmentPath: str, + headers: bool, + shouldFilterNotMisleadingNotes: bool = True, + logging: bool = True, + ) -> None: + """ + Args: + notesPath (str): file path + ratingsPath (str): file path + noteStatusHistoryPath (str): file path + userEnrollmentPath (str): file path + headers: If true, expect first row of input files to be headers. + shouldFilterNotMisleadingNotes (bool, optional): Throw out not-misleading notes if True. Defaults to True. + logging (bool, optional): Print out debug output. Defaults to True. + """ + self.notesPath = notesPath + self.ratingsPath = ratingsPath + self.noteStatusHistoryPath = noteStatusHistoryPath + self.userEnrollmentPath = userEnrollmentPath + self.headers = headers + self.shouldFilterNotMisleadingNotes = shouldFilterNotMisleadingNotes + self.logging = logging + + def get_data(self) -> Tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame, pd.DataFrame]: + """All-in-one function for reading Birdwatch notes and ratings from TSV files. + It does both reading and pre-processing. + + Returns: + Tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame, pd.DataFrame]: notes, ratings, noteStatusHistory, userEnrollment + """ + notes, ratings, noteStatusHistory, userEnrollment = read_from_tsv( + self.notesPath, + self.ratingsPath, + self.noteStatusHistoryPath, + self.userEnrollmentPath, + self.headers, + ) + notes, ratings, noteStatusHistory = preprocess_data( + notes, ratings, noteStatusHistory, self.shouldFilterNotMisleadingNotes, self.logging + ) + return notes, ratings, noteStatusHistory, userEnrollment diff --git a/sourcecode/scoring/run_scoring.py b/sourcecode/scoring/run_scoring.py index e9c60190..0bb2b054 100644 --- a/sourcecode/scoring/run_scoring.py +++ b/sourcecode/scoring/run_scoring.py @@ -18,6 +18,7 @@ from .mf_core_scorer import MFCoreScorer from .mf_expansion_scorer import MFExpansionScorer from .mf_group_scorer import coalesce_group_models, groupScorerCount, MFGroupScorer +from .process_data import CommunityNotesDataLoader from .scorer import Scorer from .scoring_rules import RuleID @@ -163,7 +164,20 @@ def _merge_results( return scoredNotes, helpfulnessScores, auxiliaryNoteInfo -def _run_scorer_parallelizable(scorer, ratings, noteStatusHistory, userEnrollment): +def _run_scorer_parallelizable( + scorer: Scorer, + runParallel: bool, + ratings: Optional[pd.DataFrame] = None, + noteStatusHistory: Optional[pd.DataFrame] = None, + userEnrollment: Optional[pd.DataFrame] = None, + dataLoader: Optional[CommunityNotesDataLoader] = None, +): + + if runParallel: + assert dataLoader is not None, "must provide a dataLoader to run parallel" + if dataLoader is not None: + _, ratings, noteStatusHistory, userEnrollment = dataLoader.get_data() + runCounter = 0 scorerStartTime = time.perf_counter() result = None @@ -216,7 +230,9 @@ def _run_scorers( ratings: pd.DataFrame, noteStatusHistory: pd.DataFrame, userEnrollment: pd.DataFrame, - runParallel: bool = False, + runParallel: bool = True, + maxWorkers: Optional[int] = None, + dataLoader: Optional[CommunityNotesDataLoader] = None, ) -> Tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame]: """Applies all Community Notes models to user ratings and returns merged result. @@ -241,17 +257,17 @@ def _run_scorers( overallStartTime = time.perf_counter() if runParallel: with concurrent.futures.ProcessPoolExecutor( - mp_context=multiprocessing.get_context("spawn") + mp_context=multiprocessing.get_context("spawn"), max_workers=maxWorkers ) as executor: print(f"Starting parallel scorer execution with {len(scorers)} scorers.") futures = [ - executor.submit(_run_scorer_parallelizable, s, ratings, noteStatusHistory, userEnrollment) - for s in scorers + executor.submit(_run_scorer_parallelizable, s, True, dataLoader=dataLoader) for s in scorers ] modelResultsAndTimes = [f.result() for f in futures] else: modelResultsAndTimes = [ - _run_scorer_parallelizable(s, ratings, noteStatusHistory, userEnrollment) for s in scorers + _run_scorer_parallelizable(s, False, ratings, noteStatusHistory, userEnrollment) + for s in scorers ] modelResults, scorerTimes, runCounts = zip(*modelResultsAndTimes) @@ -608,6 +624,8 @@ def run_scoring( pseudoraters: Optional[bool] = True, enabledScorers: Optional[Set[Scorers]] = None, strictColumns: bool = True, + runParallel: bool = True, + dataLoader: Optional[CommunityNotesDataLoader] = None, ): """Invokes note scoring algorithms, merges results and computes user stats. @@ -619,6 +637,8 @@ def run_scoring( pseudoraters (bool, optional): if True, compute optional pseudorater confidence intervals enabledScorers (Set[Scorers], optional): Scorers which should be instantiated strictColumns (bool, optional): if True, validate which columns are present + runParallel (bool, optional): if True, run algorithms in parallel + dataLoader (CommunityNotesDataLoader, optional): dataLoader provided to parallel execution Returns: Tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame, pd.DataFrame]: @@ -630,7 +650,12 @@ def run_scoring( # Apply individual scoring models and obtained merged result. scorers = _get_scorers(seed, pseudoraters, enabledScorers) scoredNotes, helpfulnessScores, auxiliaryNoteInfo = _run_scorers( - list(chain(*scorers.values())), ratings, noteStatusHistory, userEnrollment + list(chain(*scorers.values())), + ratings, + noteStatusHistory, + userEnrollment, + runParallel, + dataLoader=dataLoader, ) # Augment scoredNotes and auxiliaryNoteInfo with additional attributes for each note