diff --git a/sourcecode/scoring/constants.py b/sourcecode/scoring/constants.py index a3f16649..51f213bc 100644 --- a/sourcecode/scoring/constants.py +++ b/sourcecode/scoring/constants.py @@ -16,6 +16,7 @@ maxTrainError = 0.09 +coreFlipPct = 0.15 expansionFlipPct = 0.19 maxReruns = 5 @@ -41,6 +42,7 @@ summaryKey = "summary" authorTopNotHelpfulTagValues = "authorTopNotHelpfulTagValues" modelingPopulationKey = "modelingPopulation" +modelingGroupKey = "modelingGroup" # TSV Values notHelpfulValueTsv = "NOT_HELPFUL" @@ -127,6 +129,14 @@ def rater_factor_key(i): coverageRatingStatusKey = "coverageRatingStatus" coverageNoteInterceptMaxKey = "coverageNoteInterceptMax" coverageNoteInterceptMinKey = "coverageNoteInterceptMin" +# Group Model +groupNoteInterceptKey = "groupNoteIntercept" +groupNoteFactor1Key = "groupNoteFactor1" +groupRatingStatusKey = "groupRatingStatus" +groupNoteInterceptMaxKey = "groupNoteInterceptMax" +groupNoteInterceptMinKey = "groupNoteInterceptMin" +groupRaterInterceptKey = "groupRaterIntercept" +groupRaterFactor1Key = "groupRaterFactor1" # Ids and Indexes noteIdKey = "noteId" @@ -364,6 +374,16 @@ def rater_factor_key(i): userEnrollmentTSVTypes = [dtype for (_, dtype) in userEnrollmentTSVColumnsAndTypes] userEnrollmentTSVTypeMapping = {col: dtype for (col, dtype) in userEnrollmentTSVColumnsAndTypes} +# TODO: delete expanded user enrollment definition once modeling group is fully rolled out +userEnrollmentExpandedTSVColumnsAndTypes = userEnrollmentTSVColumnsAndTypes + [ + (modelingGroupKey, np.float64) +] +userEnrollmentExpandedTSVColumns = [col for (col, _) in userEnrollmentExpandedTSVColumnsAndTypes] +userEnrollmentExpandedTSVTypes = [dtype for (_, dtype) in userEnrollmentExpandedTSVColumnsAndTypes] +userEnrollmentExpandedTSVTypeMapping = { + col: dtype for (col, dtype) in userEnrollmentExpandedTSVColumnsAndTypes +} + noteInterceptMaxKey = "internalNoteIntercept_max" noteInterceptMinKey = "internalNoteIntercept_min" noteParameterUncertaintyTSVMainColumnsAndTypes = [ @@ -421,6 +441,16 @@ def rater_factor_key(i): + incorrectFilterColumns ) +deprecatedNoteModelOutputColumns = frozenset( + { + coverageNoteInterceptKey, + coverageNoteFactor1Key, + coverageRatingStatusKey, + coverageNoteInterceptMinKey, + coverageNoteInterceptMaxKey, + } +) + noteModelOutputTSVColumnsAndTypes = [ (noteIdKey, np.int64), (coreNoteInterceptKey, np.double), @@ -451,9 +481,20 @@ def rater_factor_key(i): (expansionNoteInterceptMaxKey, np.double), (coverageNoteInterceptMinKey, np.double), (coverageNoteInterceptMaxKey, np.double), + (groupNoteInterceptKey, np.double), + (groupNoteFactor1Key, np.double), + (groupRatingStatusKey, np.str), + (groupNoteInterceptMaxKey, np.double), + (groupNoteInterceptMinKey, np.double), + (modelingGroupKey, np.float64), ] noteModelOutputTSVColumns = [col for (col, dtype) in noteModelOutputTSVColumnsAndTypes] noteModelOutputTSVTypeMapping = {col: dtype for (col, dtype) in noteModelOutputTSVColumnsAndTypes} +deprecatedNoteModelOutputTSVColumnsAndTypes = [ + (col, dtype) + for (col, dtype) in noteModelOutputTSVColumnsAndTypes + if col in deprecatedNoteModelOutputColumns +] raterModelOutputTSVColumnsAndTypes = [ (raterParticipantIdKey, np.int64), @@ -481,6 +522,9 @@ def rater_factor_key(i): (isEmergingWriterKey, np.bool_), (aggregateRatingReceivedTotal, pd.Int64Dtype()), (timestampOfLastEarnOut, np.double), + (groupRaterInterceptKey, np.double), + (groupRaterFactor1Key, np.double), + (modelingGroupKey, np.float64), ] raterModelOutputTSVColumns = [col for (col, dtype) in raterModelOutputTSVColumnsAndTypes] raterModelOutputTSVTypeMapping = {col: dtype for (col, dtype) in raterModelOutputTSVColumnsAndTypes} diff --git a/sourcecode/scoring/enums.py b/sourcecode/scoring/enums.py index d3ff9bd5..2e92df1f 100644 --- a/sourcecode/scoring/enums.py +++ b/sourcecode/scoring/enums.py @@ -6,8 +6,10 @@ class Scorers(Enum): """Exhaustive list of all scorers to simplify setting enabled/disabled scorers.""" MFCoreScorer = auto() - MFCoverageScorer = auto() MFExpansionScorer = auto() + # Note that the MFGroupScorer value controls whether *all* group scorers are instantiated, + # not just a single MFGroupScorer instance. + MFGroupScorer = auto() def scorers_from_csv(csv: str) -> Set[Scorers]: diff --git a/sourcecode/scoring/mf_base_scorer.py b/sourcecode/scoring/mf_base_scorer.py index 4acd52d1..d301ddab 100644 --- a/sourcecode/scoring/mf_base_scorer.py +++ b/sourcecode/scoring/mf_base_scorer.py @@ -87,6 +87,10 @@ def __init__( self._weightedTotalVotes = weightedTotalVotes self._mfRanker = matrix_factorization.MatrixFactorization() + def get_crh_threshold(self) -> float: + """Return CRH threshold for general scoring logic.""" + return self._crhThreshold + def get_scored_notes_cols(self) -> List[str]: """Returns a list of columns which should be present in the scoredNotes output.""" return [ @@ -160,6 +164,7 @@ def _score_notes_and_users( userScores pd.DataFrame: one row per user containing a column for each helpfulness score. """ if self._seed is not None: + print(f"seeding with {self._seed}") torch.manual_seed(self._seed) # Removes ratings where either (1) the note did not receive enough ratings, or diff --git a/sourcecode/scoring/mf_group_scorer.py b/sourcecode/scoring/mf_group_scorer.py new file mode 100644 index 00000000..756c2047 --- /dev/null +++ b/sourcecode/scoring/mf_group_scorer.py @@ -0,0 +1,285 @@ +from typing import Dict, List, Optional, Tuple + +from . import constants as c +from .mf_base_scorer import MFBaseScorer + +import numpy as np +import pandas as pd + + +# Number of MFGroupScorer objects we expect to instantiate +groupScorerCount = 12 + + +def _coalesce_columns(df: pd.DataFrame, columnPrefix: str) -> pd.DataFrame: + """Condense all columns beginning with columnPrefix into a single column. + + With each row there must be at most one column with a non-NaN value in the set of + columns beginning with columnPrefix. If a non-NaN value is present that will + become the value in the condensed column, otherwise the value will be NaN. After + column values are condensed the original (prefixed) columns will be dropped. + + Args: + df: DataFrame containing columns to condense + collumnPrefix: Prefix used to detect columns to coalesce, and the name for + the output column. + + Returns: + DataFrame with all columns prefixed by columnPrefix dropped and replaced by + a single column named columnPrefix + + Raises: + AssertionError if multiple columns prefixed by columnPrefix have non-NaN values + for any row. + """ + # Identify columns to coalesce + columns = [col for col in df.columns if col.startswith(f"{columnPrefix}_")] + if not columns: + return df + # Validate that at most one column is set, and store which rows have a column set + rowResults = np.invert(df[columns].isna()).sum(axis=1) + assert all(rowResults <= 1), "each row should only be in one modeling group" + # Coalesce results + def _get_value(row): + idx = row.first_valid_index() + return row[idx] if idx is not None else np.nan + + coalesced = df[columns].apply(_get_value, axis=1) + # Drop old columns and replace with new + df = df.drop(columns=columns) + df[columnPrefix] = coalesced + return df + + +def coalesce_group_models( + scoredNotes: pd.DataFrame, helpfulnessScores: pd.DataFrame +) -> Tuple[pd.DataFrame, pd.DataFrame]: + """Coalesce all group modeling columns across note and user scoring. + + Since each Scorer must have distinct output columns, we use coalescing to run + multiple instances of MFGroupScorer objects and then condense the results into + a single set of columns. This approach works because each note will be scored + by at most one MFGroupScorer instance. + + Args: + scoredNotes: scoring output for notes. + helpfulnessScores: scoring output for users. + + Returns: + tuple containing coalesced scoring results for notes and users. + """ + for col in [ + c.groupNoteInterceptKey, + c.groupNoteFactor1Key, + c.groupRatingStatusKey, + c.groupNoteInterceptMaxKey, + c.groupNoteInterceptMinKey, + c.modelingGroupKey, + ]: + scoredNotes = _coalesce_columns(scoredNotes, col) + + for col in [c.groupRaterInterceptKey, c.groupRaterFactor1Key, c.modelingGroupKey]: + helpfulnessScores = _coalesce_columns(helpfulnessScores, col) + + return scoredNotes, helpfulnessScores + + +class MFGroupScorer(MFBaseScorer): + def __init__( + self, + groupNumber: int, + seed: Optional[int] = None, + pseudoraters: Optional[bool] = False, + groupThreshold: float = 0.8, + ) -> None: + """Configure MFGroupScorer object. + + Notice that each MFGroupScorer defines column names by appending the groupNumber to + column prefixes which are constant. Dynamically defining the column names allows the + group scorer to be instantiated multiple times while maintaining the property that + the columns attached by each scorer remain unique. Once all scorers have ran, we + (will) validate that each note was scored by at most one group scorer and then coalesce + all of the group scoring columns and remove the groupNumber suffix. + + Args: + groupNumber: int indicating which group this scorer instance should filter for. + seed: if not None, seed value to ensure deterministic execution + pseudoraters: if True, compute optional pseudorater confidence intervals + groupThreshold: float indicating what fraction of ratings must be from within a group + for the model to be active + """ + super().__init__(seed, pseudoraters) + assert groupNumber > 0, "groupNumber must be positive. 0 is reserved for unassigned." + assert groupNumber <= groupScorerCount, "groupNumber exceeds maximum expected groups." + self._groupNumber = groupNumber + self._groupThreshold = groupThreshold + self._groupNoteInterceptKey = f"{c.groupNoteInterceptKey}_{self._groupNumber}" + self._groupNoteFactor1Key = f"{c.groupNoteFactor1Key}_{self._groupNumber}" + self._groupRatingStatusKey = f"{c.groupRatingStatusKey}_{self._groupNumber}" + self._groupNoteInterceptMaxKey = f"{c.groupNoteInterceptMaxKey}_{self._groupNumber}" + self._groupNoteInterceptMinKey = f"{c.groupNoteInterceptMinKey}_{self._groupNumber}" + self._groupRaterInterceptKey = f"{c.groupRaterInterceptKey}_{self._groupNumber}" + self._groupRaterFactor1Key = f"{c.groupRaterFactor1Key}_{self._groupNumber}" + self._modelingGroupKey = f"{c.modelingGroupKey}_{self._groupNumber}" + + def _get_note_col_mapping(self) -> Dict[str, str]: + """Returns a dict mapping default note column names to custom names for a specific model.""" + return { + c.internalNoteInterceptKey: self._groupNoteInterceptKey, + c.internalNoteFactor1Key: self._groupNoteFactor1Key, + c.internalRatingStatusKey: self._groupRatingStatusKey, + c.noteInterceptMinKey: self._groupNoteInterceptMinKey, + c.noteInterceptMaxKey: self._groupNoteInterceptMaxKey, + } + + def _get_user_col_mapping(self) -> Dict[str, str]: + """Returns a dict mapping default user column names to custom names for a specific model.""" + return { + c.internalRaterInterceptKey: self._groupRaterInterceptKey, + c.internalRaterFactor1Key: self._groupRaterFactor1Key, + } + + def get_scored_notes_cols(self) -> List[str]: + """Returns a list of columns which should be present in the scoredNotes output.""" + return [ + c.noteIdKey, + self._groupNoteInterceptKey, + self._groupNoteFactor1Key, + self._groupRatingStatusKey, + self._groupNoteInterceptMaxKey, + self._groupNoteInterceptMinKey, + self._modelingGroupKey, + ] + + def get_helpfulness_scores_cols(self) -> List[str]: + """Returns a list of columns which should be present in the helpfulnessScores output.""" + return [ + c.raterParticipantIdKey, + self._groupRaterInterceptKey, + self._groupRaterFactor1Key, + self._modelingGroupKey, + ] + + def get_auxiliary_note_info_cols(self) -> List[str]: + """Returns a list of columns which should be present in the auxiliaryNoteInfo output.""" + return [] + + def _get_dropped_note_cols(self) -> List[str]: + """Returns a list of columns which should be excluded from scoredNotes and auxiliaryNoteInfo.""" + return super()._get_dropped_note_cols() + ( + [ + c.internalActiveRulesKey, + c.activeFilterTagsKey, + c.ratingWeightKey, + ] + + c.notHelpfulTagsAdjustedColumns + + c.notHelpfulTagsAdjustedRatioColumns + + c.incorrectFilterColumns + + c.noteParameterUncertaintyTSVAuxColumns + ) + + def _get_dropped_user_cols(self) -> List[str]: + """Returns a list of columns which should be excluded from helpfulnessScores output.""" + return super()._get_dropped_user_cols() + [ + c.crhCrnhRatioDifferenceKey, + c.meanNoteScoreKey, + c.raterAgreeRatioKey, + c.aboveHelpfulnessThresholdKey, + ] + + def _filter_input( + self, ratings: pd.DataFrame, noteStatusHistory: pd.DataFrame, userEnrollment: pd.DataFrame + ) -> Tuple[pd.DataFrame, pd.DataFrame]: + """Prune the contents of ratings to only include ratings from users in the modeling group. + + This function identifies the subset of ratings to include in group model scoring. + To improve modeling within the group, we only include ratings from users in the modeling + group. However, we place no restriction on which notes to include in the model and instead + include ratings on any note. Including ratings on any note increases the amount of data + available during training about each user, in effect also increasing the number of users + and notes we are able to include in the model. + + Including notes by users outside of the modeling group means that the model will issue + scores for notes which do not meet group modeling criteria (i.e. >80% of ratings are + from users in the modeling group, and the author is also from the modeling group). We + enforce these criteria *after* scoring in _postprocess_output so that the maximum amount + of ratings are available during scoring. + + Args: + ratings (pd.DataFrame): preprocessed ratings + noteStatusHistory (pd.DataFrame): one row per note; history of when note had each status + userEnrollment (pd.DataFrame): one row per user specifying enrollment properties + + Returns: + Tuple[pd.DataFrame, pd.DataFrame]: + ratings: ratings filtered to only contain rows of interest + noteStatusHistory: noteStatusHistory filtered to only contain rows of interest + """ + userEnrollment = userEnrollment.rename(columns={c.participantIdKey: c.raterParticipantIdKey}) + userEnrollment = userEnrollment[userEnrollment[c.modelingGroupKey] == self._groupNumber] + ratings = ratings.merge(userEnrollment[[c.raterParticipantIdKey]].drop_duplicates()) + return ratings, noteStatusHistory + + def _postprocess_output( + self, + noteScores: pd.DataFrame, + userScores: pd.DataFrame, + ratings: pd.DataFrame, + noteStatusHistory: pd.DataFrame, + userEnrollment: pd.DataFrame, + ) -> Tuple[pd.DataFrame, pd.DataFrame]: + """Filter noteScores and userScores and add column containing modeling group. + + Enforce the requirements that: + - Notes scored by this group model have >=80% of ratings from the modeling group. + - Notes scored by this group model were authored by a member of the modeling group. + - Users assigned a factor / intercept are in the modeling group. + + Args: + noteScores: note outputs from scoring + userScores: user outputs from scoring + ratings (pd.DataFrame): preprocessed ratings + noteStatusHistory (pd.DataFrame): one row per note; history of when note had each status + userEnrollment (pd.DataFrame): one row per user specifying enrollment properties + + Returns: + Tuple[pd.DataFrame, pd.DataFrame]: + noteScores: filtered and updated note scoring output + userScores: filtered and updated user scoring output + """ + # Prune notes according to authorship filter. + noteScores = noteScores.merge( + userEnrollment[[c.participantIdKey, c.modelingGroupKey]].rename( + columns={c.participantIdKey: c.noteAuthorParticipantIdKey} + ), + how="left", + ) + noteScores = noteScores[noteScores[c.modelingGroupKey] == self._groupNumber] + noteScores = noteScores.drop(columns=c.modelingGroupKey) + # Identify notes with enough ratings from within the modeling group. + ratings = ratings.merge( + userEnrollment[[c.participantIdKey, c.modelingGroupKey]].rename( + columns={c.participantIdKey: c.raterParticipantIdKey} + ), + how="left", + ) + ratings["inGroup"] = ratings[c.modelingGroupKey] == self._groupNumber + ratios = ratings[[c.noteIdKey, "inGroup"]].groupby(c.noteIdKey).mean().reset_index() + notesAboveThreshold = ratios[ratios["inGroup"] >= self._groupThreshold][[c.noteIdKey]] + noteScores = noteScores.merge(notesAboveThreshold) + # Note that even though ratings were restricted to the modeling group, users outside of + # the modeling group may still have authored a note which was rated and may consequently + # appear in the userScores. Accordingly, we drop any user which was outside of the + # modeling group from the user scores. + userScores = userScores.merge( + userEnrollment[[c.participantIdKey, c.modelingGroupKey]].rename( + columns={c.participantIdKey: c.raterParticipantIdKey} + ), + how="left", + ) + userScores = userScores[userScores[c.modelingGroupKey] == self._groupNumber] + userScores = userScores.drop(columns=c.modelingGroupKey) + # Set the modelingGroupKey column in each output + noteScores[self._modelingGroupKey] = self._groupNumber + userScores[self._modelingGroupKey] = self._groupNumber + return noteScores, userScores diff --git a/sourcecode/scoring/process_data.py b/sourcecode/scoring/process_data.py index 20c923b4..d62cd9c6 100644 --- a/sourcecode/scoring/process_data.py +++ b/sourcecode/scoring/process_data.py @@ -100,9 +100,34 @@ def tsv_parser( raise ValueError("invalid input") -def tsv_reader(path: str, mapping, columns, header=False): +# TODO: remove this function once modelingGroup column is fully launched +def user_enrollment_parser(rawTSV: str, header: bool) -> pd.DataFrame: + """Parse user enrollment TSV and optinoally tolerate the modelingGroup column. + + Args: + rawTSV: str contianing entire TSV input + header: bool indicating whether the input will have a header + + Returns: + pd.DataFrame containing parsed data + """ + try: + df = tsv_parser(rawTSV, c.userEnrollmentTSVTypeMapping, c.userEnrollmentTSVColumns, header) + df[c.modelingGroupKey] = 0 + except ValueError: + df = tsv_parser( + rawTSV, c.userEnrollmentExpandedTSVTypeMapping, c.userEnrollmentExpandedTSVColumns, header + ) + return df + + +# TODO: remove support for specifying a custom parser once modelingGroup is fully rolled out +def tsv_reader(path: str, mapping, columns, header=False, parser=tsv_parser): with open(path, "r") as handle: - return tsv_parser(handle.read(), mapping, columns, header) + if parser == tsv_parser: + return parser(handle.read(), mapping, columns, header) + else: + return parser(handle.read(), header) def read_from_tsv( @@ -163,13 +188,13 @@ def read_from_tsv( userEnrollment = None else: userEnrollment = tsv_reader( - userEnrollmentPath, c.userEnrollmentTSVTypeMapping, c.userEnrollmentTSVColumns, header=headers + userEnrollmentPath, None, None, header=headers, parser=user_enrollment_parser ) - assert len(userEnrollment.columns.values) == len(c.userEnrollmentTSVColumns) and all( - userEnrollment.columns == c.userEnrollmentTSVColumns + assert len(userEnrollment.columns.values) <= len(c.userEnrollmentExpandedTSVColumns) and ( + len(set(userEnrollment.columns) - set(c.userEnrollmentExpandedTSVColumns)) == 0 ), ( - f"userEnrollment columns don't match: \n{[col for col in userEnrollment.columns if not col in c.userEnrollmentTSVColumns]} are extra columns, " - + f"\n{[col for col in c.userEnrollmentTSVColumns if not col in userEnrollment.columns]} are missing." + f"userEnrollment columns don't match: \n{[col for col in userEnrollment.columns if not col in c.userEnrollmentExpandedTSVColumns]} are extra columns, " + + f"\n{[col for col in c.userEnrollmentExpandedTSVColumns if not col in userEnrollment.columns]} are missing." ) return notes, ratings, noteStatusHistory, userEnrollment diff --git a/sourcecode/scoring/run_scoring.py b/sourcecode/scoring/run_scoring.py index 622fb9f5..07c258a0 100644 --- a/sourcecode/scoring/run_scoring.py +++ b/sourcecode/scoring/run_scoring.py @@ -7,16 +7,17 @@ from collections import namedtuple import concurrent.futures +from itertools import chain import multiprocessing import time -from typing import List, Optional, Set, Tuple +from typing import Dict, List, Optional, Set, Tuple from . import constants as c, contributor_state, note_ratings, note_status_history, scoring_rules from .enums import Scorers from .mf_base_scorer import MFBaseScorer from .mf_core_scorer import MFCoreScorer -from .mf_coverage_scorer import MFCoverageScorer, MFDummyCoverageScorer from .mf_expansion_scorer import MFExpansionScorer +from .mf_group_scorer import coalesce_group_models, groupScorerCount, MFGroupScorer from .scorer import Scorer from .scoring_rules import RuleID @@ -76,7 +77,6 @@ def _check_flips( prevCrh = max( 1, len(unlockedPopNsh.loc[unlockedPopNsh[c.currentLabelKey] == c.currentlyRatedHelpful]) ) - print(f"Percentage of previous CRH flipping status: {(statusFlips / prevCrh)}") return (statusFlips / prevCrh) > pctFlips @@ -84,7 +84,7 @@ def _check_flips( def _get_scorers( seed: Optional[int], pseudoraters: Optional[bool], enabledScorers: Optional[Set[Scorers]] -) -> List[Scorer]: +) -> Dict[Scorers, List[Scorer]]: """Instantiate all Scorer objects which should be used for note ranking. Args: @@ -93,18 +93,20 @@ def _get_scorers( enabledScorers: if not None, set of which scorers should be instantiated and enabled Returns: - List[Scorer] containing instantiated Scorer objects for note ranking. + Dict[Scorers, List[Scorer]] containing instantiated Scorer objects for note ranking. """ - scorers: List[Scorer] = [] + scorers: Dict[Scorers, List[Scorer]] = dict() if enabledScorers is None or Scorers.MFCoreScorer in enabledScorers: - scorers.append(MFCoreScorer(seed, pseudoraters)) + scorers[Scorers.MFCoreScorer] = [MFCoreScorer(seed, pseudoraters)] if enabledScorers is None or Scorers.MFExpansionScorer in enabledScorers: - scorers.append(MFExpansionScorer(seed)) - if enabledScorers is not None and Scorers.MFCoverageScorer in enabledScorers: - scorers.append(MFCoverageScorer(seed)) - else: - scorers.append(MFDummyCoverageScorer()) + scorers[Scorers.MFExpansionScorer] = [MFExpansionScorer(seed)] + if enabledScorers is None or Scorers.MFGroupScorer in enabledScorers: + # Note that index 0 is reserved, corresponding to no group assigned, so scoring group + # numbers begin with index 1. + scorers[Scorers.MFGroupScorer] = [ + MFGroupScorer(groupNumber=i) for i in range(1, groupScorerCount + 1) + ] return scorers @@ -166,9 +168,9 @@ def _run_scorer_parallelizable(scorer, ratings, noteStatusHistory, userEnrollmen result = None flips = False - if isinstance( - scorer, (MFBaseScorer, MFCoreScorer, MFCoverageScorer, MFExpansionScorer) - ) and not isinstance(scorer, MFDummyCoverageScorer): + # TODO: encapsulate this at a lower level so we're not accessing private state and can deal + # with the case where there are no ratings for a scorer to run on after filtering. + if isinstance(scorer, (MFBaseScorer, MFCoreScorer, MFExpansionScorer)): while ( (len(scorer._mfRanker.train_errors) == 0) or (scorer._mfRanker.train_errors[-1] > c.maxTrainError) @@ -176,6 +178,8 @@ def _run_scorer_parallelizable(scorer, ratings, noteStatusHistory, userEnrollmen ): runCounter += 1 result = ModelResult(*scorer.score(ratings, noteStatusHistory, userEnrollment)) + if runCounter > c.maxReruns: + break # check for notes createdat > lock time, not more than x% have changed status from nsh if isinstance(scorer, MFExpansionScorer): flips = _check_flips( @@ -186,6 +190,18 @@ def _run_scorer_parallelizable(scorer, ratings, noteStatusHistory, userEnrollmen c.expansionFlipPct, c.expansionRatingStatusKey, ) + elif isinstance(scorer, MFCoreScorer): + flips = _check_flips( + result.scoredNotes, + noteStatusHistory, + userEnrollment, + [c.core], + c.coreFlipPct, + c.coreRatingStatusKey, + ) + if flips: + if scorer._seed is not None: + scorer._seed = scorer._seed + 1 else: result = ModelResult(*scorer.score(ratings, noteStatusHistory, userEnrollment)) runCounter += 1 @@ -263,11 +279,13 @@ def _run_scorers( modelResult.helpfulnessScores, modelResult.auxiliaryNoteInfo, ) + scoredNotes, helpfulnessScores = coalesce_group_models(scoredNotes, helpfulnessScores) return scoredNotes, helpfulnessScores, auxiliaryNoteInfo def meta_score( + scorers: Dict[Scorers, List[Scorer]], scoredNotes: pd.DataFrame, auxiliaryNoteInfo: pd.DataFrame, lockedStatus: pd.DataFrame, @@ -314,12 +332,26 @@ def meta_score( RuleID.CORE_MODEL, {RuleID.META_INITIAL_NMR}, c.coreRatingStatusKey ) ) - if enabledScorers is not None and Scorers.MFCoverageScorer in enabledScorers: - rules.append( - scoring_rules.ApplyAdditiveModelResult( - RuleID.COVERAGE_MODEL, {RuleID.META_INITIAL_NMR}, c.coverageRatingStatusKey, 0.38 + if enabledScorers is None or Scorers.MFGroupScorer in enabledScorers: + # TODO: modify this code to work when MFExpansionScorer is disabled by the system test + assert len(scorers[Scorers.MFCoreScorer]) == 1 + assert len(scorers[Scorers.MFExpansionScorer]) == 1 + coreScorer = scorers[Scorers.MFCoreScorer][0] + assert isinstance(coreScorer, MFCoreScorer) + expnasionScorer = scorers[Scorers.MFExpansionScorer][0] + assert isinstance(expnasionScorer, MFExpansionScorer) + coreCrhThreshold = coreScorer.get_crh_threshold() + expansionCrhThreshold = expnasionScorer.get_crh_threshold() + for i in range(1, groupScorerCount + 1): + rules.append( + scoring_rules.ApplyGroupModelResult( + RuleID[f"GROUP_MODEL_{i}"], + {RuleID.EXPANSION_MODEL, RuleID.CORE_MODEL}, + i, + coreCrhThreshold, + expansionCrhThreshold, + ) ) - ) rules.extend( [ scoring_rules.ScoringDriftGuard( @@ -510,6 +542,26 @@ def _compute_helpfulness_scores( return helpfulnessScores +def _add_deprecated_columns(scoredNotes: pd.DataFrame) -> pd.DataFrame: + """Impute columns which are no longer used but must be maintained in output. + + Args: + scoredNotes: DataFrame containing note scoring output + + Returns: + scoredNotes augmented to include deprecated columns filled with dummy values + """ + for column, columnType in c.deprecatedNoteModelOutputTSVColumnsAndTypes: + assert column not in scoredNotes.columns + if columnType == np.double: + scoredNotes[column] = np.nan + elif columnType == np.str: + scoredNotes[column] = "" + else: + assert False, f"column type {columnType} unsupported" + return scoredNotes + + def _validate( scoredNotes: pd.DataFrame, helpfulnessScores: pd.DataFrame, @@ -576,7 +628,7 @@ def run_scoring( # Apply individual scoring models and obtained merged result. scorers = _get_scorers(seed, pseudoraters, enabledScorers) scoredNotes, helpfulnessScores, auxiliaryNoteInfo = _run_scorers( - scorers, ratings, noteStatusHistory, userEnrollment + list(chain(*scorers.values())), ratings, noteStatusHistory, userEnrollment ) # Augment scoredNotes and auxiliaryNoteInfo with additional attributes for each note @@ -588,6 +640,7 @@ def run_scoring( # Assign final status to notes based on individual model scores and note attributes. scoredNotesCols, auxiliaryNoteInfoCols = meta_score( + scorers, scoredNotes, auxiliaryNoteInfo, noteStatusHistory[[c.noteIdKey, c.lockedStatusKey]], @@ -618,6 +671,7 @@ def run_scoring( ), "noteStatusHistory should contain all notes after preprocessing" # Skip validation and selection out output columns if the set of scorers is overridden. + scoredNotes = _add_deprecated_columns(scoredNotes) if strictColumns: scoredNotes, helpfulnessScores, newNoteStatusHistory, auxiliaryNoteInfo = _validate( scoredNotes, helpfulnessScores, newNoteStatusHistory, auxiliaryNoteInfo diff --git a/sourcecode/scoring/scorer.py b/sourcecode/scoring/scorer.py index 5353c72c..b01c42ba 100644 --- a/sourcecode/scoring/scorer.py +++ b/sourcecode/scoring/scorer.py @@ -58,6 +58,35 @@ def _filter_input( """ return ratings, noteStatusHistory + def _postprocess_output( + self, + noteScores: pd.DataFrame, + userScores: pd.DataFrame, + ratings: pd.DataFrame, + noteStatusHistory: pd.DataFrame, + userEnrollment: pd.DataFrame, + ) -> Tuple[pd.DataFrame, pd.DataFrame]: + """Prune noteScores and userScores and augment with any additional columns as necessary. + + Note that ratings, noteStatusHistory and userEnrollment are expected to be the *raw* + versions which were supplied to "score", not the version output after filtering. + Operating on the raw versions allows accurately computing statistics over the entire dataset + (e.g. fraction of users from a modeling group). + + Args: + noteScores (pd.DataFrame): scoring output for notes + userScores (pd.DataFrame): scoirng output for users + ratings (pd.DataFrame): preprocessed ratings + noteStatusHistory (pd.DataFrame): one row per note; history of when note had each status + userEnrollment (pd.DataFrame): one row per user specifying enrollment properties + + Returns: + Tuple[pd.DataFrame, pd.DataFrame]: + noteScores: note scoring output from _score_notes_and_users + userScores: user scoring output from _score_notes_and_users + """ + return noteScores, userScores + def _get_note_col_mapping(self) -> Dict[str, str]: """Returns a dict mapping default note column names to custom names for a specific model.""" return {} @@ -83,14 +112,17 @@ def _score_notes_and_users( """ def score( - self, ratings: pd.DataFrame, noteStatusHistory: pd.DataFrame, userEnrollment: pd.DataFrame + self, + ratingsRaw: pd.DataFrame, + noteStatusHistoryRaw: pd.DataFrame, + userEnrollmentRaw: pd.DataFrame, ) -> Tuple[pd.DataFrame, Optional[pd.DataFrame], Optional[pd.DataFrame]]: """Process ratings to assign status to notes and optionally compute rater properties. Args: - ratings (pd.DataFrame): preprocessed ratings - noteStatusHistory (pd.DataFrame): one row per note; history of when note had each status - userEnrollment (pd.DataFrame): one row per user specifying enrollment properties + ratingsRaw (pd.DataFrame): preprocessed ratings + noteStatusHistoryRaw (pd.DataFrame): one row per note; history of when note had each status + userEnrollmentRaw (pd.DataFrame): one row per user specifying enrollment properties Returns: Tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame]: @@ -99,8 +131,24 @@ def score( auxiliaryNoteInfo: one row per note containing adjusted and ratio tag values """ # Transform input, run core scoring algorithm, transform output. - ratings, noteStatusHistory = self._filter_input(ratings, noteStatusHistory, userEnrollment) + ratings, noteStatusHistory = self._filter_input( + ratingsRaw, noteStatusHistoryRaw, userEnrollmentRaw + ) + # If there are no ratings left after filtering, then return empty dataframes. + if len(ratings) == 0: + return ( + pd.DataFrame(columns=self.get_scored_notes_cols()), + pd.DataFrame(columns=self.get_helpfulness_scores_cols()) + if self.get_helpfulness_scores_cols() + else None, + pd.DataFrame(columns=self.get_auxiliary_note_info_cols()) + if self.get_auxiliary_note_info_cols() + else None, + ) noteScores, userScores = self._score_notes_and_users(ratings, noteStatusHistory) + noteScores, userScores = self._postprocess_output( + noteScores, userScores, ratingsRaw, noteStatusHistoryRaw, userEnrollmentRaw + ) noteScores = noteScores.rename(columns=self._get_note_col_mapping()) userScores = userScores.rename(columns=self._get_user_col_mapping()) # TODO: Tolerate unexpcted columns if --nostrict-columns is set. diff --git a/sourcecode/scoring/scoring_rules.py b/sourcecode/scoring/scoring_rules.py index 319c8fe7..ea9bda94 100644 --- a/sourcecode/scoring/scoring_rules.py +++ b/sourcecode/scoring/scoring_rules.py @@ -36,6 +36,18 @@ class RuleID(Enum): EXPANSION_MODEL = RuleAndVersion("ExpansionModel", "1.1", False) CORE_MODEL = RuleAndVersion("CoreModel", "1.1", True) COVERAGE_MODEL = RuleAndVersion("CoverageModel", "1.1", False) + GROUP_MODEL_1 = RuleAndVersion("GroupModel01", "1.1", False) + GROUP_MODEL_2 = RuleAndVersion("GroupModel02", "1.1", False) + GROUP_MODEL_3 = RuleAndVersion("GroupModel03", "1.1", False) + GROUP_MODEL_4 = RuleAndVersion("GroupModel04", "1.1", False) + GROUP_MODEL_5 = RuleAndVersion("GroupModel05", "1.1", False) + GROUP_MODEL_6 = RuleAndVersion("GroupModel06", "1.1", False) + GROUP_MODEL_7 = RuleAndVersion("GroupModel07", "1.1", False) + GROUP_MODEL_8 = RuleAndVersion("GroupModel08", "1.1", False) + GROUP_MODEL_9 = RuleAndVersion("GroupModel09", "1.1", False) + GROUP_MODEL_10 = RuleAndVersion("GroupModel10", "1.1", False) + GROUP_MODEL_11 = RuleAndVersion("GroupModel11", "1.1", False) + GROUP_MODEL_12 = RuleAndVersion("GroupModel12", "1.1", False) INSUFFICIENT_EXPLANATION = RuleAndVersion("InsufficientExplanation", "1.0", True) SCORING_DRIFT_GUARD = RuleAndVersion("ScoringDriftGuard", "1.0", False) @@ -186,56 +198,6 @@ def score_notes( return (noteStatusUpdates, None) -class ApplyAdditiveModelResult(ScoringRule): - def __init__( - self, - ruleID: RuleID, - dependencies: Set[RuleID], - sourceColumn: str, - coreThreshold: float, - ): - """Set CRH status based on probationary model subject to safeguard threshold on core model. - - This rule sets CRH note status based on probationary models subject to several criteria: - * The note must be have CRH status from the probationary model. - * The note must currently be scored as NMR. This criteria guarantees that (1) probationary - models strictly expand coverage and (2) notes which were rated CRH by the core - model never have the decidedBy field overwritten by a less confident model. - * The note must score above a defined threshold on the core model. This criteria acts as - safeguard against dangerous detections from probationary models. - - Args: - rule: enum corresponding to a namedtuple defining a rule name and version string for the ScoringRule. - dependencies: Rules which must run before this rule can run. - sourceColumn: column containing note status (CRH, CRNH, NMR) to propagate to output. - coreThreshold: minimum score which notes must receive from the core model. - """ - super().__init__(ruleID, dependencies) - self._sourceColumn = sourceColumn - self._coreThreshold = coreThreshold - - def score_notes( - self, noteStats: pd.DataFrame, currentLabels: pd.DataFrame, statusColumn: str - ) -> (Tuple[pd.DataFrame, Optional[pd.DataFrame]]): - """Flip notes from NMR to CRH based on probationary model and subject to core model safeguard.""" - # Identify notes which meet each of the 3 criteria. - probationaryCRHNotes = noteStats[noteStats[self._sourceColumn] == c.currentlyRatedHelpful][ - [c.noteIdKey] - ] - currentNMRNotes = currentLabels[currentLabels[statusColumn] == c.needsMoreRatings][ - [c.noteIdKey] - ] - aboveSafeGuard = noteStats[noteStats[c.coreNoteInterceptKey] > self._coreThreshold][ - [c.noteIdKey] - ] - # Identify overlap and return status update. - noteStatusUpdates = probationaryCRHNotes.merge( - currentNMRNotes, on=c.noteIdKey, how="inner" - ).merge(aboveSafeGuard, on=c.noteIdKey, how="inner") - noteStatusUpdates[statusColumn] = c.currentlyRatedHelpful - return (noteStatusUpdates, None) - - class FilterTagOutliers(ScoringRule): def __init__( self, @@ -358,6 +320,92 @@ def score_notes( return (noteStatusUpdates, None) +class ApplyGroupModelResult(ScoringRule): + def __init__( + self, + ruleID: RuleID, + dependencies: Set[RuleID], + groupNumber: int, + coreCrhThreshold: float, + expansionCrhThreshold: float, + minSafeguardThreshold: float = 0.3, + ): + """Set CRH status based on a modeling group result. + + This rule sets CRH note status based on group models subject to several criteria: + * The note must be have CRH status from the group model. + * The note must currently be scored as NMR. This criteria guarantees that (1) group + models strictly expand coverage and (2) notes which were rated CRH by the core + model never have the decidedBy field overwritten by a less confident model. + * The note must have an intercept from either the core or expansion models, and the intercept + of the most confident model must fall within a defined range. We construct the range + to guarantee we can avoid CRHing notes which substantially lacked broad appeal, and + to guarantee that we will not CRH a note which was blocked by tag or inaccuracy filtering + from either the core or expansion models, as applicable. + + Args: + ruleID: enum corresponding to a namedtuple defining a rule name and version string for the ScoringRule. + dependencies: Rules which must run before this rule can run. + groupNumber: modeling group index which this instance of ApplyGroupModelResult should act on. + coreCrhThreshold: maximum intercept allowed on core model for group model CRH notes. + expansionCrhThreshold: maximum intercept allowed on expansion model for group model CRH notes. + minSafeguardThreshold: minimum intercept for core or expansion model. + """ + super().__init__(ruleID, dependencies) + self._groupNumber = groupNumber + self._minSafeguardThreshold = minSafeguardThreshold + self._coreCrhThreshold = coreCrhThreshold + self._expansionCrhThreshold = expansionCrhThreshold + + def score_notes( + self, noteStats: pd.DataFrame, currentLabels: pd.DataFrame, statusColumn: str + ) -> (Tuple[pd.DataFrame, Optional[pd.DataFrame]]): + """Flip notes from NMR to CRH based on group models and subject to core/expansion model safeguards.""" + # Identify notes which were CRH from the applicable group model. + probationaryCRHNotes = noteStats[ + (noteStats[c.groupRatingStatusKey] == c.currentlyRatedHelpful) + & (noteStats[c.modelingGroupKey] == self._groupNumber) + ][[c.noteIdKey]] + # Identify notes which are currently NMR. + currentNMRNotes = currentLabels[currentLabels[statusColumn] == c.needsMoreRatings][ + [c.noteIdKey] + ] + # Identify notes which pass score bound checks for expansion and core models. + noteStats = noteStats[[c.noteIdKey, c.coreNoteInterceptKey, c.expansionNoteInterceptKey]].copy() + noteStats["core"] = (noteStats[c.coreNoteInterceptKey] < self._coreCrhThreshold) & ( + noteStats[c.coreNoteInterceptKey] > self._minSafeguardThreshold + ) + noteStats.loc[noteStats[c.coreNoteInterceptKey].isna(), "core"] = np.nan + noteStats["expansion"] = ( + noteStats[c.expansionNoteInterceptKey] < self._expansionCrhThreshold + ) & (noteStats[c.expansionNoteInterceptKey] > self._minSafeguardThreshold) + noteStats.loc[noteStats[c.expansionNoteInterceptKey].isna(), "expansion"] = np.nan + + def _get_value(row): + idx = row.first_valid_index() + # If either core or expansion had an intercept then return whether it was in the valid + # range. If neither had an intercept, return False. Preference is given to core due + # to the ordering when selecting columns from noteStats below. + if idx is None: + return False + elif row[idx] == 1.0: + return True + elif row[idx] == 0.0: + return False + else: + assert False, f"unexpected value: {row[idx]}" + + noteStats["actionable"] = noteStats[["core", "expansion"]].apply(_get_value, axis=1) + actionableNotes = noteStats[noteStats["actionable"]][[c.noteIdKey]] + + # Identify overlap and return status update. + noteStatusUpdates = probationaryCRHNotes.merge( + currentNMRNotes, on=c.noteIdKey, how="inner" + ).merge(actionableNotes, on=c.noteIdKey, how="inner") + noteStatusUpdates[statusColumn] = c.currentlyRatedHelpful + return (noteStatusUpdates, None) + + class InsufficientExplanation(ScoringRule): def __init__( self,