From 88bac22e0afd2b4843f0c84e8c747f6d013085b9 Mon Sep 17 00:00:00 2001 From: Brad Miller Date: Fri, 22 Mar 2024 15:05:01 -0700 Subject: [PATCH] topic modelilng --- sourcecode/scoring/constants.py | 20 +- sourcecode/scoring/contributor_state.py | 73 ++++++- sourcecode/scoring/enums.py | 10 + sourcecode/scoring/mf_base_scorer.py | 41 ++++ sourcecode/scoring/mf_core_scorer.py | 1 + sourcecode/scoring/mf_expansion_scorer.py | 1 + sourcecode/scoring/mf_group_scorer.py | 54 +---- sourcecode/scoring/mf_topic_scorer.py | 251 ++++++++++++++++++++++ sourcecode/scoring/process_data.py | 34 ++- sourcecode/scoring/reputation_scorer.py | 6 +- sourcecode/scoring/run_scoring.py | 36 +++- sourcecode/scoring/runner.py | 3 +- sourcecode/scoring/scorer.py | 10 +- sourcecode/scoring/scoring_rules.py | 48 +++++ sourcecode/scoring/topic_model.py | 198 +++++++++++++++++ 15 files changed, 695 insertions(+), 91 deletions(-) create mode 100644 sourcecode/scoring/mf_topic_scorer.py create mode 100644 sourcecode/scoring/topic_model.py diff --git a/sourcecode/scoring/constants.py b/sourcecode/scoring/constants.py index 86c144e8..2bc9ef3d 100644 --- a/sourcecode/scoring/constants.py +++ b/sourcecode/scoring/constants.py @@ -44,6 +44,7 @@ helpfulnessLevelKey = "helpfulnessLevel" createdAtMillisKey = "createdAtMillis" summaryKey = "summary" +noteTopicKey = "noteTopic" authorTopNotHelpfulTagValues = "authorTopNotHelpfulTagValues" modelingPopulationKey = "modelingPopulation" modelingGroupKey = "modelingGroup" @@ -148,6 +149,11 @@ def rater_factor_key(i): groupNoteInterceptMinKey = "groupNoteInterceptMin" groupRaterInterceptKey = "groupRaterIntercept" groupRaterFactor1Key = "groupRaterFactor1" +# Topic Model +topicNoteInterceptKey = "topicNoteIntercept" +topicNoteFactor1Key = "topicNoteFactor1" +topicRatingStatusKey = "topicRatingStatus" +topicNoteConfidentKey = "topicNoteConfident" # Harassment/Abuse Tag harassmentNoteInterceptKey = "harassmentNoteIntercept" harassmentNoteFactor1Key = "harassmentNoteFactor1" @@ -408,6 +414,8 @@ def rater_factor_key(i): core = "CORE" expansion = "EXPANSION" expansionPlus = "EXPANSION_PLUS" +topWriterWritingImpact = 10 +topWriterHitRate = 0.04 userEnrollmentTSVColumnsAndTypes = [ (participantIdKey, str), @@ -422,11 +430,6 @@ def rater_factor_key(i): userEnrollmentTSVColumns = [col for (col, _) in userEnrollmentTSVColumnsAndTypes] userEnrollmentTSVTypes = [dtype for (_, dtype) in userEnrollmentTSVColumnsAndTypes] userEnrollmentTSVTypeMapping = {col: dtype for (col, dtype) in userEnrollmentTSVColumnsAndTypes} -# TODO: Remove the "old" user enrollment schemas below once numberOfTimesEarnedOut is in production -userEnrollmentTSVColumnsOld = [col for (col, _) in userEnrollmentTSVColumnsAndTypes[:7]] -userEnrollmentTSVTypeMappingOld = { - col: dtype for (col, dtype) in userEnrollmentTSVColumnsAndTypes[:7] -} noteInterceptMaxKey = "internalNoteIntercept_max" noteInterceptMinKey = "internalNoteIntercept_min" @@ -532,6 +535,11 @@ def rater_factor_key(i): (expansionPlusNoteInterceptKey, np.double), (expansionPlusNoteFactor1Key, np.double), (expansionPlusRatingStatusKey, str), + (topicNoteInterceptKey, np.double), + (topicNoteFactor1Key, np.double), + (topicRatingStatusKey, str), + (noteTopicKey, str), + (topicNoteConfidentKey, str), ] noteModelOutputTSVColumns = [col for (col, dtype) in noteModelOutputTSVColumnsAndTypes] noteModelOutputTSVTypeMapping = {col: dtype for (col, dtype) in noteModelOutputTSVColumnsAndTypes} @@ -571,7 +579,7 @@ def rater_factor_key(i): (groupRaterFactor1Key, np.double), (modelingGroupKey, np.float64), (raterHelpfulnessReputationKey, np.double), - (numberOfTimesEarnedOutKey, np.int64), + (numberOfTimesEarnedOutKey, np.float64), ] raterModelOutputTSVColumns = [col for (col, dtype) in raterModelOutputTSVColumnsAndTypes] raterModelOutputTSVTypeMapping = {col: dtype for (col, dtype) in raterModelOutputTSVColumnsAndTypes} diff --git a/sourcecode/scoring/contributor_state.py b/sourcecode/scoring/contributor_state.py index 1bc88ad7..7fac8ff8 100644 --- a/sourcecode/scoring/contributor_state.py +++ b/sourcecode/scoring/contributor_state.py @@ -77,6 +77,28 @@ def is_earned_in(authorEnrollmentCounts): ) +def is_top_writer(authorEnrollmentCounts): + """ + The author is a top writer when they have at least 10 WI and 4% hit rate + + Args: + authorEnrollmentCounts (pd.DataFrame): Scored Notes + User Enrollment status + """ + # check whether any notes have been written at all to avoid divide by zero + totalNotes = ( + authorEnrollmentCounts[c.notesCurrentlyRatedHelpful] + + authorEnrollmentCounts[c.notesCurrentlyRatedNotHelpful] + + authorEnrollmentCounts[c.notesAwaitingMoreRatings] + ).apply(lambda row: max([row, 1]), 1) + writingImpact = ( + authorEnrollmentCounts[c.notesCurrentlyRatedHelpful] + - authorEnrollmentCounts[c.notesCurrentlyRatedNotHelpful] + ) + return (writingImpact >= c.topWriterWritingImpact) & ( + (writingImpact / totalNotes) >= c.topWriterHitRate + ) + + def _get_rated_after_decision( ratings: pd.DataFrame, noteStatusHistory: pd.DataFrame ) -> pd.DataFrame: @@ -302,6 +324,45 @@ def is_emerging_writer(scoredNotes: pd.DataFrame): return emergingWriter[[c.noteAuthorParticipantIdKey, c.isEmergingWriterKey]] +def calculate_ri_to_earn_in(contributorScoresWithEnrollment: pd.DataFrame) -> pd.DataFrame: + """ + A function updates rating impact needed to earn in for earned out users + Args: + scoredNotes (pd.DataFrame): scored notes + Returns: + pd.DataFrame: emergingWriter The contributor scores with enrollments + """ + earnedOutUsers = ( + contributorScoresWithEnrollment[c.enrollmentState] + == c.enrollmentStateToThrift[c.earnedOutNoAcknowledge] + ) + + contributorScoresWithEnrollment.loc[ + earnedOutUsers, c.successfulRatingNeededToEarnIn + ] = contributorScoresWithEnrollment.apply( + lambda row: c.ratingImpactForEarnIn + + max([row[c.ratingImpact], 0]) + + (c.ratingImpactForEarnIn * row[c.numberOfTimesEarnedOutKey]), + axis=1, + ).loc[earnedOutUsers] + + # for top writers, overwrite the score required to earn in with non-escalating version + topWriters = is_top_writer(contributorScoresWithEnrollment) + + contributorScoresWithEnrollment.loc[ + (earnedOutUsers) & (topWriters), c.successfulRatingNeededToEarnIn + ] = contributorScoresWithEnrollment.apply( + lambda row: c.ratingImpactForEarnIn + max([row[c.ratingImpact], 0]), + axis=1, + ).loc[(earnedOutUsers) & (topWriters)] + + contributorScoresWithEnrollment.loc[ + earnedOutUsers, c.enrollmentState + ] = c.enrollmentStateToThrift[c.earnedOutAcknowledged] + + return contributorScoresWithEnrollment.drop(columns=[c.ratingImpact]) + + def get_contributor_state( scoredNotes: pd.DataFrame, ratings: pd.DataFrame, @@ -409,18 +470,10 @@ def get_contributor_state( contributorScoresWithEnrollment.loc[earnedOutUsers, c.numberOfTimesEarnedOutKey] + 1 ) - contributorScoresWithEnrollment.loc[ - earnedOutUsers, c.successfulRatingNeededToEarnIn - ] = contributorScoresWithEnrollment.loc[earnedOutUsers].apply( - lambda row: c.ratingImpactForEarnIn - + max([row[c.ratingImpact], 0]) - + (c.ratingImpactForEarnIn * row[c.numberOfTimesEarnedOutKey]), - axis=1, - ) - + # use earned out no ack internally to identify newly earned out users contributorScoresWithEnrollment.loc[ earnedOutUsers, c.enrollmentState - ] = c.enrollmentStateToThrift[c.earnedOutAcknowledged] + ] = c.enrollmentStateToThrift[c.earnedOutNoAcknowledge] contributorScoresWithEnrollment.loc[ is_earned_in(contributorScoresWithEnrollment), c.enrollmentState diff --git a/sourcecode/scoring/enums.py b/sourcecode/scoring/enums.py index 5835563d..cff342f0 100644 --- a/sourcecode/scoring/enums.py +++ b/sourcecode/scoring/enums.py @@ -12,6 +12,16 @@ class Scorers(Enum): MFGroupScorer = auto() MFExpansionPlusScorer = auto() ReputationScorer = auto() + MFTopicScorer = auto() + + +class Topics(Enum): + """Range of the note topic model.""" + + Unassigned = 0 + UkraineConflict = 1 + GazaConflict = 2 + MessiRonaldo = 3 def scorers_from_csv(csv: str) -> Set[Scorers]: diff --git a/sourcecode/scoring/mf_base_scorer.py b/sourcecode/scoring/mf_base_scorer.py index 0db9ac34..5e471d89 100644 --- a/sourcecode/scoring/mf_base_scorer.py +++ b/sourcecode/scoring/mf_base_scorer.py @@ -11,6 +11,47 @@ import torch +def coalesce_columns(df: pd.DataFrame, columnPrefix: str) -> pd.DataFrame: + """Condense all columns beginning with columnPrefix into a single column. + + With each row there must be at most one column with a non-NaN value in the set of + columns beginning with columnPrefix. If a non-NaN value is present that will + become the value in the condensed column, otherwise the value will be NaN. After + column values are condensed the original (prefixed) columns will be dropped. + + Args: + df: DataFrame containing columns to condense + collumnPrefix: Prefix used to detect columns to coalesce, and the name for + the output column. + + Returns: + DataFrame with all columns prefixed by columnPrefix dropped and replaced by + a single column named columnPrefix + + Raises: + AssertionError if multiple columns prefixed by columnPrefix have non-NaN values + for any row. + """ + # Identify columns to coalesce + columns = [col for col in df.columns if col.startswith(f"{columnPrefix}_")] + if not columns: + return df + # Validate that at most one column is set, and store which rows have a column set + rowResults = np.invert(df[columns].isna()).sum(axis=1) + assert all(rowResults <= 1), "each row should only be in one modeling group" + + # Coalesce results + def _get_value(row): + idx = row.first_valid_index() + return row[idx] if idx is not None else np.nan + + coalesced = df[columns].apply(_get_value, axis=1) + # Drop old columns and replace with new + df = df.drop(columns=columns) + df[columnPrefix] = coalesced + return df + + def get_ratings_for_stable_init( ratingsForTraining: pd.DataFrame, userEnrollmentRaw: pd.DataFrame, diff --git a/sourcecode/scoring/mf_core_scorer.py b/sourcecode/scoring/mf_core_scorer.py index aaa0d366..c4d5786f 100644 --- a/sourcecode/scoring/mf_core_scorer.py +++ b/sourcecode/scoring/mf_core_scorer.py @@ -172,6 +172,7 @@ def get_helpfulness_scores_cols(self) -> List[str]: def _filter_input( self, + noteTopics: pd.DataFrame, ratingsOrig: pd.DataFrame, noteStatusHistoryOrig: pd.DataFrame, userEnrollment: pd.DataFrame, diff --git a/sourcecode/scoring/mf_expansion_scorer.py b/sourcecode/scoring/mf_expansion_scorer.py index 9a88c289..74314362 100644 --- a/sourcecode/scoring/mf_expansion_scorer.py +++ b/sourcecode/scoring/mf_expansion_scorer.py @@ -92,6 +92,7 @@ def _get_dropped_user_cols(self) -> List[str]: def _filter_input( self, + noteTopics: pd.DataFrame, ratingsOrig: pd.DataFrame, noteStatusHistoryOrig: pd.DataFrame, userEnrollment: pd.DataFrame, diff --git a/sourcecode/scoring/mf_group_scorer.py b/sourcecode/scoring/mf_group_scorer.py index 22b94859..fc20a6bc 100644 --- a/sourcecode/scoring/mf_group_scorer.py +++ b/sourcecode/scoring/mf_group_scorer.py @@ -1,9 +1,8 @@ from typing import Dict, List, Optional, Tuple from . import constants as c -from .mf_base_scorer import MFBaseScorer +from .mf_base_scorer import MFBaseScorer, coalesce_columns -import numpy as np import pandas as pd @@ -21,47 +20,6 @@ } -def _coalesce_columns(df: pd.DataFrame, columnPrefix: str) -> pd.DataFrame: - """Condense all columns beginning with columnPrefix into a single column. - - With each row there must be at most one column with a non-NaN value in the set of - columns beginning with columnPrefix. If a non-NaN value is present that will - become the value in the condensed column, otherwise the value will be NaN. After - column values are condensed the original (prefixed) columns will be dropped. - - Args: - df: DataFrame containing columns to condense - collumnPrefix: Prefix used to detect columns to coalesce, and the name for - the output column. - - Returns: - DataFrame with all columns prefixed by columnPrefix dropped and replaced by - a single column named columnPrefix - - Raises: - AssertionError if multiple columns prefixed by columnPrefix have non-NaN values - for any row. - """ - # Identify columns to coalesce - columns = [col for col in df.columns if col.startswith(f"{columnPrefix}_")] - if not columns: - return df - # Validate that at most one column is set, and store which rows have a column set - rowResults = np.invert(df[columns].isna()).sum(axis=1) - assert all(rowResults <= 1), "each row should only be in one modeling group" - - # Coalesce results - def _get_value(row): - idx = row.first_valid_index() - return row[idx] if idx is not None else np.nan - - coalesced = df[columns].apply(_get_value, axis=1) - # Drop old columns and replace with new - df = df.drop(columns=columns) - df[columnPrefix] = coalesced - return df - - def coalesce_group_models( scoredNotes: pd.DataFrame, helpfulnessScores: pd.DataFrame ) -> Tuple[pd.DataFrame, pd.DataFrame]: @@ -87,10 +45,10 @@ def coalesce_group_models( c.groupNoteInterceptMinKey, c.modelingGroupKey, ]: - scoredNotes = _coalesce_columns(scoredNotes, col) + scoredNotes = coalesce_columns(scoredNotes, col) for col in [c.groupRaterInterceptKey, c.groupRaterFactor1Key, c.modelingGroupKey]: - helpfulnessScores = _coalesce_columns(helpfulnessScores, col) + helpfulnessScores = coalesce_columns(helpfulnessScores, col) return scoredNotes, helpfulnessScores @@ -251,7 +209,11 @@ def _get_dropped_user_cols(self) -> List[str]: ] def _filter_input( - self, ratings: pd.DataFrame, noteStatusHistory: pd.DataFrame, userEnrollment: pd.DataFrame + self, + noteTopics: pd.DataFrame, + ratings: pd.DataFrame, + noteStatusHistory: pd.DataFrame, + userEnrollment: pd.DataFrame, ) -> Tuple[pd.DataFrame, pd.DataFrame]: """Prune the contents of ratings to only include ratings from users in the modeling group. diff --git a/sourcecode/scoring/mf_topic_scorer.py b/sourcecode/scoring/mf_topic_scorer.py new file mode 100644 index 00000000..4767dc2d --- /dev/null +++ b/sourcecode/scoring/mf_topic_scorer.py @@ -0,0 +1,251 @@ +from typing import Dict, List, Optional, Tuple + +from . import constants as c +from .mf_base_scorer import MFBaseScorer, coalesce_columns + +import pandas as pd + + +def coalesce_topic_models(scoredNotes: pd.DataFrame) -> pd.DataFrame: + """Coalesce all topic modeling columns across note and user scoring. + + Since each Scorer must have distinct output columns, we use coalescing to run + multiple instances of MFTopicScorer objects and then condense the results into + a single set of columns. This approach works because each note will be scored + by at most one MFTopicScorer instance. + + Args: + scoredNotes: scoring output for notes. + + Returns: + tuple containing coalesced scoring results for notes and users. + """ + for col in [ + c.topicNoteInterceptKey, + c.topicNoteFactor1Key, + c.topicRatingStatusKey, + c.topicNoteConfidentKey, + c.noteTopicKey, + ]: + scoredNotes = coalesce_columns(scoredNotes, col) + + return scoredNotes + + +class MFTopicScorer(MFBaseScorer): + def __init__( + self, + topicName: str, + seed: Optional[int] = None, + pseudoraters: Optional[bool] = False, + saveIntermediateState: bool = False, + userFactorLambda=None, + noteFactorLambda=None, + userInterceptLambda=None, + noteInterceptLambda=None, + globalInterceptLambda=None, + diamondLambda=None, + normalizedLossHyperparameters=None, + maxFirstMFTrainError: float = 0.16, + maxFinalMFTrainError: float = 0.09, + minMeanNoteScore: float = 0.05, + crhThreshold: float = 0.40, + crnhThresholdIntercept: float = -0.05, + crnhThresholdNoteFactorMultiplier: float = -0.8, + crnhThresholdNMIntercept: float = -0.15, + crhSuperThreshold: float = 0.5, + lowDiligenceThreshold: float = 0.263, + factorThreshold: float = 0.5, + multiplyPenaltyByHarassmentScore: bool = True, + minimumHarassmentScoreToPenalize: float = 2.0, + tagConsensusHarassmentHelpfulRatingPenalty: int = 10, + ) -> None: + """Configure MFTopicScorer object. + + Notice that each MFTopicScorer defines column names by appending the topicName to + column prefixes which are constant. Dynamically defining the column names allows the + topic scorer to be instantiated multiple times while maintaining the property that + the columns attached by each scorer remain unique. Once all scorers have ran, we + (will) validate that each note was scored by at most one topic scorer and then coalesce + all of the topic scoring columns and remove the topicName suffix. + + Args: + topicName: str indicating which topic this scorer instance should filter for. + seed: if not None, seed value to ensure deterministic execution + pseudoraters: if True, compute optional pseudorater confidence intervals + """ + super().__init__( + seed, + pseudoraters, + useStableInitialization=False, + saveIntermediateState=saveIntermediateState, + threads=4, + userFactorLambda=userFactorLambda, + noteFactorLambda=noteFactorLambda, + userInterceptLambda=userInterceptLambda, + noteInterceptLambda=noteInterceptLambda, + globalInterceptLambda=globalInterceptLambda, + diamondLambda=diamondLambda, + normalizedLossHyperparameters=normalizedLossHyperparameters, + maxFirstMFTrainError=maxFirstMFTrainError, + maxFinalMFTrainError=maxFinalMFTrainError, + minMeanNoteScore=minMeanNoteScore, + crhThreshold=crhThreshold, + crnhThresholdIntercept=crnhThresholdIntercept, + crnhThresholdNoteFactorMultiplier=crnhThresholdNoteFactorMultiplier, + crnhThresholdNMIntercept=crnhThresholdNMIntercept, + crhSuperThreshold=crhSuperThreshold, + lowDiligenceThreshold=lowDiligenceThreshold, + factorThreshold=factorThreshold, + multiplyPenaltyByHarassmentScore=multiplyPenaltyByHarassmentScore, + minimumHarassmentScoreToPenalize=minimumHarassmentScoreToPenalize, + tagConsensusHarassmentHelpfulRatingPenalty=tagConsensusHarassmentHelpfulRatingPenalty, + ) + self._topicName = topicName + self._topicNoteInterceptKey = f"{c.topicNoteInterceptKey}_{self._topicName}" + self._topicNoteFactor1Key = f"{c.topicNoteFactor1Key}_{self._topicName}" + self._topicRatingStatusKey = f"{c.topicRatingStatusKey}_{self._topicName}" + self._noteTopicKey = f"{c.noteTopicKey}_{self._topicName}" + self._noteTopicConfidentKey = f"{c.topicNoteConfidentKey}_{self._topicName}" + + def get_name(self): + return f"MFTopicScorer_{self._topicName}" + + def _get_note_col_mapping(self) -> Dict[str, str]: + """Returns a dict mapping default note column names to custom names for a specific model.""" + return { + c.internalNoteInterceptKey: self._topicNoteInterceptKey, + c.internalNoteFactor1Key: self._topicNoteFactor1Key, + c.internalRatingStatusKey: self._topicRatingStatusKey, + } + + def get_scored_notes_cols(self) -> List[str]: + """Returns a list of columns which should be present in the scoredNotes output.""" + return [ + c.noteIdKey, + self._topicNoteInterceptKey, + self._topicNoteFactor1Key, + self._topicRatingStatusKey, + self._noteTopicKey, + self._noteTopicConfidentKey, + ] + + def get_helpfulness_scores_cols(self) -> List[str]: + """Returns a list of columns which should be present in the helpfulnessScores output.""" + return [] + + def get_auxiliary_note_info_cols(self) -> List[str]: + """Returns a list of columns which should be present in the auxiliaryNoteInfo output.""" + return [] + + def _get_dropped_note_cols(self) -> List[str]: + """Returns a list of columns which should be excluded from scoredNotes and auxiliaryNoteInfo.""" + return super()._get_dropped_note_cols() + ( + [ + c.internalActiveRulesKey, + c.activeFilterTagsKey, + c.ratingWeightKey, + c.noteInterceptMinKey, + c.noteInterceptMaxKey, + ] + + c.notHelpfulTagsAdjustedColumns + + c.notHelpfulTagsAdjustedRatioColumns + + c.incorrectFilterColumns + + c.noteParameterUncertaintyTSVAuxColumns + ) + + def _get_dropped_user_cols(self) -> List[str]: + """Returns a list of columns which should be excluded from helpfulnessScores output.""" + return super()._get_dropped_user_cols() + [ + c.crhCrnhRatioDifferenceKey, + c.meanNoteScoreKey, + c.raterAgreeRatioKey, + c.aboveHelpfulnessThresholdKey, + c.internalRaterInterceptKey, + c.internalRaterFactor1Key, + c.raterParticipantIdKey, + ] + + def _filter_input( + self, + noteTopics: pd.DataFrame, + ratings: pd.DataFrame, + noteStatusHistory: pd.DataFrame, + userEnrollment: pd.DataFrame, + ) -> Tuple[pd.DataFrame, pd.DataFrame]: + """Prune the contents of ratings to only include ratings from notes on this topic. + + Args: + noteTopics: DF pairing notes and topics + ratings (pd.DataFrame): preprocessed ratings + noteStatusHistory (pd.DataFrame): one row per note; history of when note had each status + userEnrollment (pd.DataFrame): one row per user specifying enrollment properties + + Returns: + Tuple[pd.DataFrame, pd.DataFrame]: + ratings: ratings filtered to only contain rows of interest + noteStatusHistory: noteStatusHistory filtered to only contain rows of interest + """ + notes = noteTopics[noteTopics[c.noteTopicKey] == self._topicName][[c.noteIdKey]] + ratings = ratings.merge(notes) + noteStatusHistory = noteStatusHistory.merge(notes) + return ratings, noteStatusHistory + + def _postprocess_output( + self, + noteScores: pd.DataFrame, + userScores: pd.DataFrame, + ratings: pd.DataFrame, + noteStatusHistory: pd.DataFrame, + userEnrollment: pd.DataFrame, + ) -> Tuple[pd.DataFrame, pd.DataFrame]: + """Add noteTopicKey to notes output. + + Args: + noteScores: note outputs from scoring + userScores: user outputs from scoring + ratings (pd.DataFrame): preprocessed ratings + noteStatusHistory (pd.DataFrame): one row per note; history of when note had each status + userEnrollment (pd.DataFrame): one row per user specifying enrollment properties + + Returns: + Tuple[pd.DataFrame, pd.DataFrame]: + noteScores: filtered and updated note scoring output + userScores: filtered and updated user scoring output + """ + # Set the modelingGroupKey column in each output + noteScores[self._noteTopicKey] = self._topicName + # Calculate total counts of positive and negative factor ratings + scoredNotes = noteScores[~noteScores[c.internalNoteInterceptKey].isna()][[c.noteIdKey]] + posFactorRaters = userScores[userScores[c.internalRaterFactor1Key] >= 0][ + [c.raterParticipantIdKey] + ] + posFactorRatings = ( + ratings[[c.noteIdKey, c.raterParticipantIdKey]].merge(scoredNotes).merge(posFactorRaters) + ) + posFactorCounts = ( + posFactorRatings.groupby(c.noteIdKey) + .count() + .reset_index(drop=False) + .rename(columns={c.raterParticipantIdKey: "posRatingTotal"}) + ) + negFactorRaters = userScores[userScores[c.internalRaterFactor1Key] < 0][ + [c.raterParticipantIdKey] + ] + negFactorRatings = ( + ratings[[c.noteIdKey, c.raterParticipantIdKey]].merge(scoredNotes).merge(negFactorRaters) + ) + negFactorCounts = ( + negFactorRatings.groupby(c.noteIdKey) + .count() + .reset_index(drop=False) + .rename(columns={c.raterParticipantIdKey: "negRatingTotal"}) + ) + # Set scoring confidence bit + posFactorCounts = posFactorCounts[posFactorCounts["posRatingTotal"] > 4][[c.noteIdKey]] + negFactorCounts = negFactorCounts[negFactorCounts["negRatingTotal"] > 4][[c.noteIdKey]] + confidentNotes = posFactorCounts.merge(negFactorCounts) + confidentNotes[self._noteTopicConfidentKey] = True + noteScores = noteScores.merge(confidentNotes, how="left") + noteScores = noteScores.fillna({self._noteTopicConfidentKey: False}) + return noteScores, userScores diff --git a/sourcecode/scoring/process_data.py b/sourcecode/scoring/process_data.py index dca467fd..ce089611 100644 --- a/sourcecode/scoring/process_data.py +++ b/sourcecode/scoring/process_data.py @@ -147,28 +147,18 @@ def read_from_tsv( if userEnrollmentPath is None: userEnrollment = None else: - try: - userEnrollment = tsv_reader( - userEnrollmentPath, - c.userEnrollmentTSVTypeMapping, - c.userEnrollmentTSVColumns, - header=headers, - ) - assert len(userEnrollment.columns.values) == len(c.userEnrollmentTSVColumns) and all( - userEnrollment.columns == c.userEnrollmentTSVColumns - ), ( - f"userEnrollment columns don't match: \n{[col for col in userEnrollment.columns if not col in c.userEnrollmentTSVColumns]} are extra columns, " - + f"\n{[col for col in c.userEnrollmentTSVColumns if not col in userEnrollment.columns]} are missing." - ) - except ValueError: - # TODO: clean up fallback for old mappings once numberOfTimesEarnedOut column is in production - userEnrollment = tsv_reader( - userEnrollmentPath, - c.userEnrollmentTSVTypeMappingOld, - c.userEnrollmentTSVColumnsOld, - header=headers, - ) - userEnrollment[c.numberOfTimesEarnedOutKey] = 0 + userEnrollment = tsv_reader( + userEnrollmentPath, + c.userEnrollmentTSVTypeMapping, + c.userEnrollmentTSVColumns, + header=headers, + ) + assert len(userEnrollment.columns.values) == len(c.userEnrollmentTSVColumns) and all( + userEnrollment.columns == c.userEnrollmentTSVColumns + ), ( + f"userEnrollment columns don't match: \n{[col for col in userEnrollment.columns if not col in c.userEnrollmentTSVColumns]} are extra columns, " + + f"\n{[col for col in c.userEnrollmentTSVColumns if not col in userEnrollment.columns]} are missing." + ) return notes, ratings, noteStatusHistory, userEnrollment diff --git a/sourcecode/scoring/reputation_scorer.py b/sourcecode/scoring/reputation_scorer.py index 38f35fa1..d49a9aae 100644 --- a/sourcecode/scoring/reputation_scorer.py +++ b/sourcecode/scoring/reputation_scorer.py @@ -72,7 +72,11 @@ def _get_dropped_user_cols(self) -> List[str]: return [] def _filter_input( - self, ratings: pd.DataFrame, noteStatusHistory: pd.DataFrame, userEnrollment: pd.DataFrame + self, + noteTopics: pd.DataFrame, + ratings: pd.DataFrame, + noteStatusHistory: pd.DataFrame, + userEnrollment: pd.DataFrame, ) -> Tuple[pd.DataFrame, pd.DataFrame]: ratings, noteStatusHistory = filter_core_input(ratings, noteStatusHistory, userEnrollment) ratings = filter_ratings(ratings, self._minNumRatingsPerRater, self._minNumRatersPerNote) diff --git a/sourcecode/scoring/run_scoring.py b/sourcecode/scoring/run_scoring.py index 94e9fac7..19e8c901 100644 --- a/sourcecode/scoring/run_scoring.py +++ b/sourcecode/scoring/run_scoring.py @@ -13,7 +13,7 @@ from typing import Dict, List, Optional, Set, Tuple from . import constants as c, contributor_state, note_ratings, note_status_history, scoring_rules -from .enums import Scorers +from .enums import Scorers, Topics from .matrix_factorization.normalized_loss import NormalizedLossHyperparameters from .mf_core_scorer import MFCoreScorer from .mf_expansion_plus_scorer import MFExpansionPlusScorer @@ -24,10 +24,12 @@ groupScorerCount, trialScoringGroup, ) +from .mf_topic_scorer import MFTopicScorer, coalesce_topic_models from .process_data import CommunityNotesDataLoader from .reputation_scorer import ReputationScorer from .scorer import Scorer from .scoring_rules import RuleID +from .topic_model import TopicModel import numpy as np import pandas as pd @@ -109,6 +111,10 @@ def _get_scorers( tagConsensusHarassmentHelpfulRatingPenalty=10, ) ) + if enabledScorers is None or Scorers.MFTopicScorer in enabledScorers: + scorers[Scorers.MFTopicScorer] = [ + MFTopicScorer(topicName=topic.name, seed=seed) for topic in Topics + ] return scorers @@ -167,6 +173,7 @@ def _merge_results( def _run_scorer_parallelizable( scorer: Scorer, runParallel: bool, + noteTopics: pd.DataFrame, ratings: Optional[pd.DataFrame] = None, noteStatusHistory: Optional[pd.DataFrame] = None, userEnrollment: Optional[pd.DataFrame] = None, @@ -178,7 +185,7 @@ def _run_scorer_parallelizable( _, ratings, noteStatusHistory, userEnrollment = dataLoader.get_data() scorerStartTime = time.perf_counter() - result = ModelResult(*scorer.score(ratings, noteStatusHistory, userEnrollment)) + result = ModelResult(*scorer.score(noteTopics, ratings, noteStatusHistory, userEnrollment)) scorerEndTime = time.perf_counter() return result, (scorerEndTime - scorerStartTime) @@ -186,6 +193,7 @@ def _run_scorer_parallelizable( def _run_scorers( scorers: List[Scorer], + noteTopics: pd.DataFrame, ratings: pd.DataFrame, noteStatusHistory: pd.DataFrame, userEnrollment: pd.DataFrame, @@ -202,6 +210,7 @@ def _run_scorers( Args: scorers (List[Scorer]): Instantiated Scorer objects for note ranking. + noteTopics: DF pairing notes with topics ratings (pd.DataFrame): Complete DF containing all ratings after preprocessing. noteStatusHistory (pd.DataFrame): one row per note; history of when note had each status userEnrollment (pd.DataFrame): The enrollment state for each contributor @@ -224,6 +233,7 @@ def _run_scorers( _run_scorer_parallelizable, scorer=scorer, runParallel=True, + noteTopics=noteTopics, dataLoader=dataLoader, ) for scorer in scorers @@ -234,6 +244,7 @@ def _run_scorers( _run_scorer_parallelizable( scorer=scorer, runParallel=False, + noteTopics=noteTopics, ratings=ratings, noteStatusHistory=noteStatusHistory, userEnrollment=userEnrollment, @@ -270,6 +281,7 @@ def _run_scorers( modelResult.auxiliaryNoteInfo, ) scoredNotes, helpfulnessScores = coalesce_group_models(scoredNotes, helpfulnessScores) + scoredNotes = coalesce_topic_models(scoredNotes) return scoredNotes, helpfulnessScores, auxiliaryNoteInfo @@ -366,7 +378,17 @@ def meta_score( minSafeguardThreshold=None, ) ) - + if enabledScorers is None or Scorers.MFTopicScorer in enabledScorers: + for topic in Topics: + if topic == Topics.Unassigned: + continue + rules.append( + scoring_rules.ApplyTopicModelResult( + RuleID[f"TOPIC_MODEL_{topic.value}"], + {RuleID.EXPANSION_PLUS_MODEL, RuleID.EXPANSION_MODEL, RuleID.CORE_MODEL}, + topic, + ) + ) rules.extend( [ scoring_rules.ScoringDriftGuard( @@ -541,11 +563,13 @@ def _compute_helpfulness_scores( c.authorTopNotHelpfulTagValues, c.isEmergingWriterKey, c.numberOfTimesEarnedOutKey, + c.ratingImpact, ] ], on=c.raterParticipantIdKey, how="outer", ) + contributorScores = contributor_state.calculate_ri_to_earn_in(contributorScores) # Consolidates all information on raters / authors. helpfulnessScores = helpfulnessScores.merge( @@ -624,6 +648,7 @@ def _validate( def run_scoring( + notes: pd.DataFrame, ratings: pd.DataFrame, noteStatusHistory: pd.DataFrame, userEnrollment: pd.DataFrame, @@ -639,6 +664,7 @@ def run_scoring( """Invokes note scoring algorithms, merges results and computes user stats. Args: + notes (pd.DataFrame): notes including text ratings (pd.DataFrame): preprocessed ratings noteStatusHistory (pd.DataFrame): one row per note; history of when note had each status userEnrollment (pd.DataFrame): The enrollment state for each contributor @@ -658,11 +684,15 @@ def run_scoring( """ # Apply individual scoring models and obtained merged result. currentTimeMillis = c.epochMillis + with c.time_block("Note Topic Assignment"): + topicModel = TopicModel() + noteTopics = topicModel.get_note_topics(notes) scorers = _get_scorers( seed, pseudoraters, enabledScorers, useStableInitialization=useStableInitialization ) scoredNotes, helpfulnessScores, auxiliaryNoteInfo = _run_scorers( scorers=list(chain(*scorers.values())), + noteTopics=noteTopics, ratings=ratings, noteStatusHistory=noteStatusHistory, userEnrollment=userEnrollment, diff --git a/sourcecode/scoring/runner.py b/sourcecode/scoring/runner.py index 7f83e72f..496b3117 100644 --- a/sourcecode/scoring/runner.py +++ b/sourcecode/scoring/runner.py @@ -90,10 +90,11 @@ def main(): # Load input dataframes. dataLoader = LocalDataLoader(args.notes, args.ratings, args.status, args.enrollment, args.headers) - _, ratings, statusHistory, userEnrollment = dataLoader.get_data() + notes, ratings, statusHistory, userEnrollment = dataLoader.get_data() # Invoke scoring and user contribution algorithms. scoredNotes, helpfulnessScores, newStatus, auxNoteInfo = run_scoring( + notes, ratings, statusHistory, userEnrollment, diff --git a/sourcecode/scoring/scorer.py b/sourcecode/scoring/scorer.py index a1d99708..8fde9eb8 100644 --- a/sourcecode/scoring/scorer.py +++ b/sourcecode/scoring/scorer.py @@ -62,7 +62,11 @@ def _get_dropped_user_cols(self) -> List[str]: """Returns a list of columns which should be excluded from helpfulnessScores output.""" def _filter_input( - self, ratings: pd.DataFrame, noteStatusHistory: pd.DataFrame, userEnrollment: pd.DataFrame + self, + noteTopics: pd.DataFrame, + ratings: pd.DataFrame, + noteStatusHistory: pd.DataFrame, + userEnrollment: pd.DataFrame, ) -> Tuple[pd.DataFrame, pd.DataFrame]: """Prune the contents of ratings and noteStatusHistory to scope model behavior. @@ -134,6 +138,7 @@ def _score_notes_and_users( def score( self, + noteTopics: pd.DataFrame, ratingsRaw: pd.DataFrame, noteStatusHistoryRaw: pd.DataFrame, userEnrollmentRaw: pd.DataFrame, @@ -141,6 +146,7 @@ def score( """Process ratings to assign status to notes and optionally compute rater properties. Args: + noteTopics: DF specifying {note, topic} pairs ratingsRaw (pd.DataFrame): preprocessed ratings noteStatusHistoryRaw (pd.DataFrame): one row per note; history of when note had each status userEnrollmentRaw (pd.DataFrame): one row per user specifying enrollment properties @@ -156,7 +162,7 @@ def score( # Transform input, run core scoring algorithm, transform output. with self.time_block("Filter input"): ratings, noteStatusHistory = self._filter_input( - ratingsRaw, noteStatusHistoryRaw, userEnrollmentRaw + noteTopics, ratingsRaw, noteStatusHistoryRaw, userEnrollmentRaw ) # If there are no ratings left after filtering, then return empty dataframes. if len(ratings) == 0: diff --git a/sourcecode/scoring/scoring_rules.py b/sourcecode/scoring/scoring_rules.py index 11fca7f6..1ba0acc8 100644 --- a/sourcecode/scoring/scoring_rules.py +++ b/sourcecode/scoring/scoring_rules.py @@ -4,6 +4,7 @@ from typing import Callable, List, Optional, Set, Tuple from . import constants as c, tag_filter +from .enums import Topics from .explanation_tags import top_tags import numpy as np @@ -53,6 +54,9 @@ class RuleID(Enum): GROUP_MODEL_14 = RuleAndVersion("GroupModel14", "1.1", False) INSUFFICIENT_EXPLANATION = RuleAndVersion("InsufficientExplanation", "1.0", True) SCORING_DRIFT_GUARD = RuleAndVersion("ScoringDriftGuard", "1.0", False) + TOPIC_MODEL_1 = RuleAndVersion("TopicModel01", "1.0", False) + TOPIC_MODEL_2 = RuleAndVersion("TopicModel02", "1.0", False) + TOPIC_MODEL_3 = RuleAndVersion("TopicModel03", "1.0", False) def get_name(self) -> str: """Returns a string combining the name and version to uniquely name the logic of the ScoringRule.""" @@ -735,6 +739,50 @@ def score_notes( return noteStatusUpdates, unlockedStatus +class ApplyTopicModelResult(ScoringRule): + def __init__( + self, + ruleID: RuleID, + dependencies: Set[RuleID], + topic: Topics, + topicNMRInterceptThreshold: Optional[float] = 0.25, + topicNMRFactorThreshold: Optional[float] = 0.5, + ): + """Set any note scored by a topic model to NMR if the note is presently CRH and has low topic intercept. + + Args: + rule: enum corresponding to a namedtuple defining a rule name and version string for the ScoringRule. + dependencies: Rules which must run before this rule can run. + topic: apply the rule to notes scored by a given topic model. + """ + super().__init__(ruleID, dependencies) + self._topic = topic + self._topicNMRInterceptThreshold = topicNMRInterceptThreshold + self._topicNMRFactorThreshold = topicNMRFactorThreshold + + def score_notes( + self, noteStats: pd.DataFrame, currentLabels: pd.DataFrame, statusColumn: str + ) -> Tuple[pd.DataFrame, Optional[pd.DataFrame]]: + """Flip notes from CRH to NMR based on the topic model results.""" + # Identify notes which are currently CRH. + currentCRHNotes = currentLabels[currentLabels[statusColumn] == c.currentlyRatedHelpful][ + [c.noteIdKey] + ] + # Identify notes which are below threshld from the applicable topic model. + topicLowNotes = noteStats[ + ( + (noteStats[c.topicNoteInterceptKey] < self._topicNMRInterceptThreshold) + | (noteStats[c.topicNoteFactor1Key].abs() > self._topicNMRFactorThreshold) + ) + & (noteStats[c.topicNoteConfidentKey]) + & (noteStats[c.noteTopicKey] == self._topic.name) + ][[c.noteIdKey]] + # Set note status for candidates and return + noteStatusUpdates = currentCRHNotes.merge(topicLowNotes, on=c.noteIdKey, how="inner") + noteStatusUpdates[statusColumn] = c.needsMoreRatings + return (noteStatusUpdates, None) + + def apply_scoring_rules( noteStats: pd.DataFrame, rules: List[ScoringRule], diff --git a/sourcecode/scoring/topic_model.py b/sourcecode/scoring/topic_model.py new file mode 100644 index 00000000..906a3993 --- /dev/null +++ b/sourcecode/scoring/topic_model.py @@ -0,0 +1,198 @@ +"""Assign notes to a set of predetermined topics. + +The topic assignment process is seeded with a small set of terms which are indicative of +the topic. After preliminary topic assignment based on term matching, a logistic regression +trained on bag-of-words features model expands the set of in-topic notes for each topic. +Note that the logistic regression modeling excludes any tokens containing seed terms. + +This approach represents a prelimiary approach to topic assignment while Community Notes +evaluates the efficacy of per-topic note scoring. +""" + +from typing import List, Tuple + +from . import constants as c + +from .enums import Topics + +import numpy as np +import pandas as pd +from sklearn.feature_extraction.text import CountVectorizer, TfidfTransformer +from sklearn.linear_model import LogisticRegression +from sklearn.metrics import balanced_accuracy_score +from sklearn.pipeline import Pipeline + + +class TopicModel(object): + def __init__(self): + """Initialize a list of seed terms for each topic.""" + self._seedTerms = { + Topics.UkraineConflict: { + "ukraine", + "russia", + "kiev", + "kyiv", + "moscow", + "zelensky", + "putin", + }, + Topics.GazaConflict: { + "israel", + "palestin", + "gaza", + "jerusalem", + }, + Topics.MessiRonaldo: { + "messi", + "ronaldo", + }, + } + + def _make_seed_labels(self, texts: np.ndarray) -> Tuple[np.ndarray, np.ndarray]: + """Produce a label vector based on seed terms. + + The label vector has type np.int64 with values corresponding to the enum value for + each topic. Any text which matches seed terms from multiple topics is left unassigned. + + Args: + texts: array containing strings for topic assignment + + Returns: + Tuple[0]: array specifing topic labels for texts + Tuple[1]: array specifying texts that are unassigned due to conflicting matches. + """ + texts = np.array([text.lower() for text in texts]) + labels = np.zeros(texts.shape[0], dtype=np.int64) + conflictedTexts = np.zeros(texts.shape[0]) + for topic in Topics: + if topic == Topics.Unassigned: + continue + topicMatches = np.array( + [any(term in text for term in self._seedTerms[topic]) for text in texts] + ) + labels[topicMatches] = topic.value + conflictedTexts += topicMatches.astype(np.int64) + labels[conflictedTexts > 1] = Topics.Unassigned.value + print(f" Notes unassigned due to multiple matches: {conflictedTexts.sum()}") + return labels, conflictedTexts > 1 + + def _get_stop_words(self, texts: np.ndarray) -> List[str]: + """Identify tokens in the extracted vocabulary that contain seed terms. + + Any token containing a seed term will be treated as a stop word (i.e. removed + from the extracted features). This prevents the model from training on the same + tokens used to label the data. + + Args: + texts: array containing strings for topic assignment + + Returns: + List specifying which tokens to exclude from the features. + """ + # Extract vocabulary + cv = CountVectorizer(strip_accents="unicode") + cv.fit(texts) + rawVocabulary = cv.vocabulary_.keys() + print(f" Initial vocabulary length: {len(rawVocabulary)}") + # Identify stop words + blockedTokens = set() + for terms in self._seedTerms.values(): + blockedTokens |= terms + print(f" Total tokens to filter: {len(blockedTokens)}") + stopWords = [v for v in rawVocabulary if any(t in v for t in blockedTokens)] + print(f" Total identified stopwords: {len(stopWords)}") + return stopWords + + def _merge_predictions_and_labels( + self, predictions: np.ndarray, labels: np.ndarray + ) -> np.ndarray: + """Update predictions based on defined labels when the label is not Unassigned. + + Args: + predictions: 1D matrix specifying the class label as an int64 assigned to each row. + + Returns: + Updated predictions based on keyword matches when available. + """ + for label in range(1, len(Topics)): + predictions[labels == label] = label + return predictions + + def _prepare_post_text(self, notes: pd.DataFrame) -> pd.DataFrame: + """Concatenate all notes within each post into a single row associated with the post. + + Args: + notes: dataframe containing raw note text + + Returns: + DataFrame with one post per row containing note text + """ + postNoteText = ( + notes[[c.tweetIdKey, c.summaryKey]] + .fillna({c.summaryKey: ""}) + .groupby(c.tweetIdKey)[c.summaryKey] + .apply(lambda postNotes: " ".join(postNotes)) + .reset_index(drop=False) + ) + # Default tokenization for CountVectorizer will not split on underscore, which + # results in very long tokens containing many words inside of URLs. Removing + # underscores allows us to keep default splitting while fixing that problem. + postNoteText[c.summaryKey] = [ + text.replace("_", " ") for text in postNoteText[c.summaryKey].values + ] + return postNoteText + + def get_note_topics(self, notes: pd.DataFrame) -> pd.DataFrame: + """Return a DataFrame specifying each {note, topic} pair. + + Notes that are not assigned to a topic do not appear in the dataframe. + + Args: + notes: DF containing all notes to potentially assign to a topic + """ + print("Assigning notes to topics:") + # Obtain aggregate post text, seed labels and stop words + postText = self._prepare_post_text(notes) + seedLabels, conflictedTexts = self._make_seed_labels(postText[c.summaryKey].values) + stopWords = self._get_stop_words(postText[c.summaryKey].values) + # Define and fit model + pipe = Pipeline( + [ + ( + "UnigramEncoder", + CountVectorizer( + strip_accents="unicode", + stop_words=stopWords, + min_df=25, + max_df=max(1000, int(0.25 * len(postText))), + ), + ), + ("tfidf", TfidfTransformer()), + ("Classifier", LogisticRegression(max_iter=1000, verbose=1)), + ], + verbose=True, + ) + pipe.fit( + # Notice that we omit posts with an unclear label from training. + postText[c.summaryKey].values[~conflictedTexts], + seedLabels[~conflictedTexts], + ) + # Predict notes. Notice that in effect we are looking to see which notes in the + # training data the model felt were mis-labeled after the training process + # completed, and generating labels for any posts which were omitted from the + # original training. + pred = pipe.predict(postText[c.summaryKey].values) + balancedAccuracy = balanced_accuracy_score(seedLabels[~conflictedTexts], pred[~conflictedTexts]) + print(f" Balanced accuracy on raw predictions: {balancedAccuracy}") + assert balancedAccuracy > 0.5, f"Balanced accuracy too low: {balancedAccuracy}" + # Validate that any conflicted text is Unassigned + assert all(seedLabels[conflictedTexts] == Topics.Unassigned.value) + pred = self._merge_predictions_and_labels(pred, seedLabels) + print(f" Topic assignment results: {np.bincount(pred)}") + + # Assign topics to notes based on aggregated note text, and drop any + # notes on posts that were unassigned. + postText[c.noteTopicKey] = [Topics(t).name for t in pred] + postText = postText[postText[c.noteTopicKey] != Topics.Unassigned.name] + noteTopics = notes[[c.noteIdKey, c.tweetIdKey]].merge(postText[[c.tweetIdKey, c.noteTopicKey]]) + return noteTopics.drop(columns=c.tweetIdKey)