Skip to content

Commit

Permalink
Merge pull request #209 from twitter/bradm/topic_models
Browse files Browse the repository at this point in the history
Initial Topic Models
  • Loading branch information
bradmiller authored Mar 23, 2024
2 parents c7db275 + 88bac22 commit 17048d4
Show file tree
Hide file tree
Showing 15 changed files with 695 additions and 91 deletions.
20 changes: 14 additions & 6 deletions sourcecode/scoring/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
helpfulnessLevelKey = "helpfulnessLevel"
createdAtMillisKey = "createdAtMillis"
summaryKey = "summary"
noteTopicKey = "noteTopic"
authorTopNotHelpfulTagValues = "authorTopNotHelpfulTagValues"
modelingPopulationKey = "modelingPopulation"
modelingGroupKey = "modelingGroup"
Expand Down Expand Up @@ -148,6 +149,11 @@ def rater_factor_key(i):
groupNoteInterceptMinKey = "groupNoteInterceptMin"
groupRaterInterceptKey = "groupRaterIntercept"
groupRaterFactor1Key = "groupRaterFactor1"
# Topic Model
topicNoteInterceptKey = "topicNoteIntercept"
topicNoteFactor1Key = "topicNoteFactor1"
topicRatingStatusKey = "topicRatingStatus"
topicNoteConfidentKey = "topicNoteConfident"
# Harassment/Abuse Tag
harassmentNoteInterceptKey = "harassmentNoteIntercept"
harassmentNoteFactor1Key = "harassmentNoteFactor1"
Expand Down Expand Up @@ -408,6 +414,8 @@ def rater_factor_key(i):
core = "CORE"
expansion = "EXPANSION"
expansionPlus = "EXPANSION_PLUS"
topWriterWritingImpact = 10
topWriterHitRate = 0.04

userEnrollmentTSVColumnsAndTypes = [
(participantIdKey, str),
Expand All @@ -422,11 +430,6 @@ def rater_factor_key(i):
userEnrollmentTSVColumns = [col for (col, _) in userEnrollmentTSVColumnsAndTypes]
userEnrollmentTSVTypes = [dtype for (_, dtype) in userEnrollmentTSVColumnsAndTypes]
userEnrollmentTSVTypeMapping = {col: dtype for (col, dtype) in userEnrollmentTSVColumnsAndTypes}
# TODO: Remove the "old" user enrollment schemas below once numberOfTimesEarnedOut is in production
userEnrollmentTSVColumnsOld = [col for (col, _) in userEnrollmentTSVColumnsAndTypes[:7]]
userEnrollmentTSVTypeMappingOld = {
col: dtype for (col, dtype) in userEnrollmentTSVColumnsAndTypes[:7]
}

noteInterceptMaxKey = "internalNoteIntercept_max"
noteInterceptMinKey = "internalNoteIntercept_min"
Expand Down Expand Up @@ -532,6 +535,11 @@ def rater_factor_key(i):
(expansionPlusNoteInterceptKey, np.double),
(expansionPlusNoteFactor1Key, np.double),
(expansionPlusRatingStatusKey, str),
(topicNoteInterceptKey, np.double),
(topicNoteFactor1Key, np.double),
(topicRatingStatusKey, str),
(noteTopicKey, str),
(topicNoteConfidentKey, str),
]
noteModelOutputTSVColumns = [col for (col, dtype) in noteModelOutputTSVColumnsAndTypes]
noteModelOutputTSVTypeMapping = {col: dtype for (col, dtype) in noteModelOutputTSVColumnsAndTypes}
Expand Down Expand Up @@ -571,7 +579,7 @@ def rater_factor_key(i):
(groupRaterFactor1Key, np.double),
(modelingGroupKey, np.float64),
(raterHelpfulnessReputationKey, np.double),
(numberOfTimesEarnedOutKey, np.int64),
(numberOfTimesEarnedOutKey, np.float64),
]
raterModelOutputTSVColumns = [col for (col, dtype) in raterModelOutputTSVColumnsAndTypes]
raterModelOutputTSVTypeMapping = {col: dtype for (col, dtype) in raterModelOutputTSVColumnsAndTypes}
Expand Down
73 changes: 63 additions & 10 deletions sourcecode/scoring/contributor_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,28 @@ def is_earned_in(authorEnrollmentCounts):
)


def is_top_writer(authorEnrollmentCounts):
"""
The author is a top writer when they have at least 10 WI and 4% hit rate
Args:
authorEnrollmentCounts (pd.DataFrame): Scored Notes + User Enrollment status
"""
# check whether any notes have been written at all to avoid divide by zero
totalNotes = (
authorEnrollmentCounts[c.notesCurrentlyRatedHelpful]
+ authorEnrollmentCounts[c.notesCurrentlyRatedNotHelpful]
+ authorEnrollmentCounts[c.notesAwaitingMoreRatings]
).apply(lambda row: max([row, 1]), 1)
writingImpact = (
authorEnrollmentCounts[c.notesCurrentlyRatedHelpful]
- authorEnrollmentCounts[c.notesCurrentlyRatedNotHelpful]
)
return (writingImpact >= c.topWriterWritingImpact) & (
(writingImpact / totalNotes) >= c.topWriterHitRate
)


def _get_rated_after_decision(
ratings: pd.DataFrame, noteStatusHistory: pd.DataFrame
) -> pd.DataFrame:
Expand Down Expand Up @@ -302,6 +324,45 @@ def is_emerging_writer(scoredNotes: pd.DataFrame):
return emergingWriter[[c.noteAuthorParticipantIdKey, c.isEmergingWriterKey]]


def calculate_ri_to_earn_in(contributorScoresWithEnrollment: pd.DataFrame) -> pd.DataFrame:
"""
A function updates rating impact needed to earn in for earned out users
Args:
scoredNotes (pd.DataFrame): scored notes
Returns:
pd.DataFrame: emergingWriter The contributor scores with enrollments
"""
earnedOutUsers = (
contributorScoresWithEnrollment[c.enrollmentState]
== c.enrollmentStateToThrift[c.earnedOutNoAcknowledge]
)

contributorScoresWithEnrollment.loc[
earnedOutUsers, c.successfulRatingNeededToEarnIn
] = contributorScoresWithEnrollment.apply(
lambda row: c.ratingImpactForEarnIn
+ max([row[c.ratingImpact], 0])
+ (c.ratingImpactForEarnIn * row[c.numberOfTimesEarnedOutKey]),
axis=1,
).loc[earnedOutUsers]

# for top writers, overwrite the score required to earn in with non-escalating version
topWriters = is_top_writer(contributorScoresWithEnrollment)

contributorScoresWithEnrollment.loc[
(earnedOutUsers) & (topWriters), c.successfulRatingNeededToEarnIn
] = contributorScoresWithEnrollment.apply(
lambda row: c.ratingImpactForEarnIn + max([row[c.ratingImpact], 0]),
axis=1,
).loc[(earnedOutUsers) & (topWriters)]

contributorScoresWithEnrollment.loc[
earnedOutUsers, c.enrollmentState
] = c.enrollmentStateToThrift[c.earnedOutAcknowledged]

return contributorScoresWithEnrollment.drop(columns=[c.ratingImpact])


def get_contributor_state(
scoredNotes: pd.DataFrame,
ratings: pd.DataFrame,
Expand Down Expand Up @@ -409,18 +470,10 @@ def get_contributor_state(
contributorScoresWithEnrollment.loc[earnedOutUsers, c.numberOfTimesEarnedOutKey] + 1
)

contributorScoresWithEnrollment.loc[
earnedOutUsers, c.successfulRatingNeededToEarnIn
] = contributorScoresWithEnrollment.loc[earnedOutUsers].apply(
lambda row: c.ratingImpactForEarnIn
+ max([row[c.ratingImpact], 0])
+ (c.ratingImpactForEarnIn * row[c.numberOfTimesEarnedOutKey]),
axis=1,
)

# use earned out no ack internally to identify newly earned out users
contributorScoresWithEnrollment.loc[
earnedOutUsers, c.enrollmentState
] = c.enrollmentStateToThrift[c.earnedOutAcknowledged]
] = c.enrollmentStateToThrift[c.earnedOutNoAcknowledge]

contributorScoresWithEnrollment.loc[
is_earned_in(contributorScoresWithEnrollment), c.enrollmentState
Expand Down
10 changes: 10 additions & 0 deletions sourcecode/scoring/enums.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,16 @@ class Scorers(Enum):
MFGroupScorer = auto()
MFExpansionPlusScorer = auto()
ReputationScorer = auto()
MFTopicScorer = auto()


class Topics(Enum):
"""Range of the note topic model."""

Unassigned = 0
UkraineConflict = 1
GazaConflict = 2
MessiRonaldo = 3


def scorers_from_csv(csv: str) -> Set[Scorers]:
Expand Down
41 changes: 41 additions & 0 deletions sourcecode/scoring/mf_base_scorer.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,47 @@
import torch


def coalesce_columns(df: pd.DataFrame, columnPrefix: str) -> pd.DataFrame:
"""Condense all columns beginning with columnPrefix into a single column.
With each row there must be at most one column with a non-NaN value in the set of
columns beginning with columnPrefix. If a non-NaN value is present that will
become the value in the condensed column, otherwise the value will be NaN. After
column values are condensed the original (prefixed) columns will be dropped.
Args:
df: DataFrame containing columns to condense
collumnPrefix: Prefix used to detect columns to coalesce, and the name for
the output column.
Returns:
DataFrame with all columns prefixed by columnPrefix dropped and replaced by
a single column named columnPrefix
Raises:
AssertionError if multiple columns prefixed by columnPrefix have non-NaN values
for any row.
"""
# Identify columns to coalesce
columns = [col for col in df.columns if col.startswith(f"{columnPrefix}_")]
if not columns:
return df
# Validate that at most one column is set, and store which rows have a column set
rowResults = np.invert(df[columns].isna()).sum(axis=1)
assert all(rowResults <= 1), "each row should only be in one modeling group"

# Coalesce results
def _get_value(row):
idx = row.first_valid_index()
return row[idx] if idx is not None else np.nan

coalesced = df[columns].apply(_get_value, axis=1)
# Drop old columns and replace with new
df = df.drop(columns=columns)
df[columnPrefix] = coalesced
return df


def get_ratings_for_stable_init(
ratingsForTraining: pd.DataFrame,
userEnrollmentRaw: pd.DataFrame,
Expand Down
1 change: 1 addition & 0 deletions sourcecode/scoring/mf_core_scorer.py
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,7 @@ def get_helpfulness_scores_cols(self) -> List[str]:

def _filter_input(
self,
noteTopics: pd.DataFrame,
ratingsOrig: pd.DataFrame,
noteStatusHistoryOrig: pd.DataFrame,
userEnrollment: pd.DataFrame,
Expand Down
1 change: 1 addition & 0 deletions sourcecode/scoring/mf_expansion_scorer.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ def _get_dropped_user_cols(self) -> List[str]:

def _filter_input(
self,
noteTopics: pd.DataFrame,
ratingsOrig: pd.DataFrame,
noteStatusHistoryOrig: pd.DataFrame,
userEnrollment: pd.DataFrame,
Expand Down
54 changes: 8 additions & 46 deletions sourcecode/scoring/mf_group_scorer.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
from typing import Dict, List, Optional, Tuple

from . import constants as c
from .mf_base_scorer import MFBaseScorer
from .mf_base_scorer import MFBaseScorer, coalesce_columns

import numpy as np
import pandas as pd


Expand All @@ -21,47 +20,6 @@
}


def _coalesce_columns(df: pd.DataFrame, columnPrefix: str) -> pd.DataFrame:
"""Condense all columns beginning with columnPrefix into a single column.
With each row there must be at most one column with a non-NaN value in the set of
columns beginning with columnPrefix. If a non-NaN value is present that will
become the value in the condensed column, otherwise the value will be NaN. After
column values are condensed the original (prefixed) columns will be dropped.
Args:
df: DataFrame containing columns to condense
collumnPrefix: Prefix used to detect columns to coalesce, and the name for
the output column.
Returns:
DataFrame with all columns prefixed by columnPrefix dropped and replaced by
a single column named columnPrefix
Raises:
AssertionError if multiple columns prefixed by columnPrefix have non-NaN values
for any row.
"""
# Identify columns to coalesce
columns = [col for col in df.columns if col.startswith(f"{columnPrefix}_")]
if not columns:
return df
# Validate that at most one column is set, and store which rows have a column set
rowResults = np.invert(df[columns].isna()).sum(axis=1)
assert all(rowResults <= 1), "each row should only be in one modeling group"

# Coalesce results
def _get_value(row):
idx = row.first_valid_index()
return row[idx] if idx is not None else np.nan

coalesced = df[columns].apply(_get_value, axis=1)
# Drop old columns and replace with new
df = df.drop(columns=columns)
df[columnPrefix] = coalesced
return df


def coalesce_group_models(
scoredNotes: pd.DataFrame, helpfulnessScores: pd.DataFrame
) -> Tuple[pd.DataFrame, pd.DataFrame]:
Expand All @@ -87,10 +45,10 @@ def coalesce_group_models(
c.groupNoteInterceptMinKey,
c.modelingGroupKey,
]:
scoredNotes = _coalesce_columns(scoredNotes, col)
scoredNotes = coalesce_columns(scoredNotes, col)

for col in [c.groupRaterInterceptKey, c.groupRaterFactor1Key, c.modelingGroupKey]:
helpfulnessScores = _coalesce_columns(helpfulnessScores, col)
helpfulnessScores = coalesce_columns(helpfulnessScores, col)

return scoredNotes, helpfulnessScores

Expand Down Expand Up @@ -251,7 +209,11 @@ def _get_dropped_user_cols(self) -> List[str]:
]

def _filter_input(
self, ratings: pd.DataFrame, noteStatusHistory: pd.DataFrame, userEnrollment: pd.DataFrame
self,
noteTopics: pd.DataFrame,
ratings: pd.DataFrame,
noteStatusHistory: pd.DataFrame,
userEnrollment: pd.DataFrame,
) -> Tuple[pd.DataFrame, pd.DataFrame]:
"""Prune the contents of ratings to only include ratings from users in the modeling group.
Expand Down
Loading

0 comments on commit 17048d4

Please sign in to comment.