Skip to content

Commit

Permalink
Scorer update: stable initialization, incorrect tag update, refactoring
Browse files Browse the repository at this point in the history
-dramatically improve matrix factorization optimization stability, by initializing with a 1st pass of users with well-estimated parameters
-add 5 new fields to note status history, with more detail about each of the different scorers
-update TF-IDF incorrect tag filtering
-move most logic in main.py to runner.py
-add a 13th group model
  • Loading branch information
jbaxter committed Sep 21, 2023
1 parent 8587686 commit 392bb23
Show file tree
Hide file tree
Showing 14 changed files with 337 additions and 204 deletions.
112 changes: 1 addition & 111 deletions sourcecode/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,117 +10,7 @@
--outdir data
"""

import argparse
import os

import scoring.constants as c
from scoring.enums import scorers_from_csv
from scoring.process_data import LocalDataLoader, write_tsv_local
from scoring.run_scoring import run_scoring


def parse_args():
parser = argparse.ArgumentParser("Community Notes Scoring")
parser.add_argument(
"-e", "--enrollment", default=c.enrollmentInputPath, help="note enrollment dataset"
)
parser.add_argument(
"--epoch-millis",
default=None,
type=float,
dest="epoch_millis",
help="timestamp in milliseconds since epoch to treat as now",
)
parser.add_argument(
"--headers",
dest="headers",
help="First row of input files should be a header",
action="store_true",
)
parser.add_argument(
"--noheaders",
dest="headers",
help="First row of input files should be data. There should be no headers.",
action="store_false",
)
parser.set_defaults(headers=True)
parser.add_argument("-n", "--notes", default=c.notesInputPath, help="note dataset")
parser.add_argument("-o", "--outdir", default=".", help="directory for output files")
parser.add_argument(
"--pseudoraters",
dest="pseudoraters",
help="Include calculation of pseudorater intervals",
action="store_true",
)
parser.add_argument(
"--nopseudoraters",
dest="pseudoraters",
help="Exclude calculation of pseudorater intervals (faster)",
action="store_false",
)
parser.set_defaults(pseudoraters=True)
parser.add_argument("-r", "--ratings", default=c.ratingsInputPath, help="rating dataset")
parser.add_argument(
"--scorers", default=None, type=scorers_from_csv, help="CSV list of scorers to enable."
)
parser.add_argument(
"--seed", default=None, type=int, help="set to an int to seed matrix factorization"
)
parser.add_argument(
"-s", "--status", default=c.noteStatusHistoryInputPath, help="note status history dataset"
)
parser.add_argument(
"--strict-columns",
dest="strict_columns",
help="Explicitly select columns and require that expected columns are present.",
action="store_true",
)
parser.add_argument(
"--nostrict-columns",
help="Disable validation of expected columns and allow unexpected columns.",
action="store_false",
dest="strict_columns",
)
parser.set_defaults(strict_columns=True)
parser.add_argument(
"--parallel",
help="Disable parallel run of algorithm.",
action="store_true",
dest="parallel",
)
parser.set_defaults(parallel=False)

return parser.parse_args()


def main():
# Parse arguments and fix timestamp, if applicable.
args = parse_args()
if args.epoch_millis:
c.epochMillis = args.epoch_millis

# Load input dataframes.
dataLoader = LocalDataLoader(args.notes, args.ratings, args.status, args.enrollment, args.headers)
_, ratings, statusHistory, userEnrollment = dataLoader.get_data()

# Invoke scoring and user contribution algorithms.
scoredNotes, helpfulnessScores, newStatus, auxNoteInfo = run_scoring(
ratings,
statusHistory,
userEnrollment,
seed=args.seed,
pseudoraters=args.pseudoraters,
enabledScorers=args.scorers,
strictColumns=args.strict_columns,
runParallel=args.parallel,
dataLoader=dataLoader if args.parallel == True else None,
)

# Write outputs to local disk.
write_tsv_local(scoredNotes, os.path.join(args.outdir, "scored_notes.tsv"))
write_tsv_local(helpfulnessScores, os.path.join(args.outdir, "helpfulness_scores.tsv"))
write_tsv_local(newStatus, os.path.join(args.outdir, "note_status_history.tsv"))
write_tsv_local(auxNoteInfo, os.path.join(args.outdir, "aux_note_info.tsv"))
from scoring.runner import main


if __name__ == "__main__":
Expand Down
18 changes: 13 additions & 5 deletions sourcecode/scoring/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -234,13 +234,9 @@ def rater_factor_key(i):

incorrectFilterColumns = [
"notHelpfulIncorrect_interval",
"cnt_interval",
"p_incorrect_user_interval",
"num_voters_interval",
"tf_idf_incorrect_interval",
"notHelpfulIncorrect_same",
"cnt_same",
"num_voters_same",
"tf_idf_incorrect_same",
]

misleadingTags = [
Expand Down Expand Up @@ -314,6 +310,11 @@ def rater_factor_key(i):
timestampMillisOfStatusLockKey = "timestampMillisOfStatusLock"
lockedStatusKey = "lockedStatus"
timestampMillisOfRetroLockKey = "timestampMillisOfRetroLock"
currentCoreStatusKey = "currentCoreStatus"
currentExpansionStatusKey = "currentExpansionStatus"
currentGroupStatusKey = "currentGroupStatus"
currentDecidedByKey = "currentDecidedBy"
currentModelingGroupKey = "currentModelingGroup"

noteStatusHistoryTSVColumnsAndTypes = [
(noteIdKey, np.int64),
Expand All @@ -328,13 +329,19 @@ def rater_factor_key(i):
(timestampMillisOfStatusLockKey, np.double), # double because nullable.
(lockedStatusKey, object),
(timestampMillisOfRetroLockKey, np.double), # double because nullable.
(currentCoreStatusKey, object),
(currentExpansionStatusKey, object),
(currentGroupStatusKey, object),
(currentDecidedByKey, object),
(currentModelingGroupKey, object),
]
noteStatusHistoryTSVColumns = [col for (col, dtype) in noteStatusHistoryTSVColumnsAndTypes]
noteStatusHistoryTSVTypes = [dtype for (col, dtype) in noteStatusHistoryTSVColumnsAndTypes]
noteStatusHistoryTSVTypeMapping = {
col: dtype for (col, dtype) in noteStatusHistoryTSVColumnsAndTypes
}


# Earn In + Earn Out
enrollmentState = "enrollmentState"
successfulRatingNeededToEarnIn = "successfulRatingNeededToEarnIn"
Expand Down Expand Up @@ -491,6 +498,7 @@ def rater_factor_key(i):
(groupNoteInterceptMinKey, np.double),
(modelingGroupKey, np.float64),
(numRatingsKey, np.int64),
(timestampMillisOfNoteCurrentLabelKey, np.double),
]
noteModelOutputTSVColumns = [col for (col, dtype) in noteModelOutputTSVColumnsAndTypes]
noteModelOutputTSVTypeMapping = {col: dtype for (col, dtype) in noteModelOutputTSVColumnsAndTypes}
Expand Down
67 changes: 30 additions & 37 deletions sourcecode/scoring/incorrect_filter.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,49 +6,40 @@
import pandas as pd


def _get_user_incorrect_ratio(ratings: pd.DataFrame) -> pd.DataFrame:
def _get_user_incorrect_ratio(nhTagRatings: pd.DataFrame) -> pd.DataFrame:
"""Computes empirical p(incorrect | not helpful tags assigned) per rater.
Args:
ratings: initial input ratings DF containing all ratings
nhTagRatings: DF containing all ratings with some NH tag
Returns:
pd.DataFrame containing one row per user who assigned not helpful tags with their empirical propensity
to assign "incorrect" tag
"""

notHelpfulTaggedRatings = ratings.loc[ratings[c.notHelpfulTagsTSVOrder].sum(axis=1) > 0]
user_incorrect = (
notHelpfulTaggedRatings[[c.raterParticipantIdKey, "notHelpfulIncorrect"]]
nhTagRatings[[c.raterParticipantIdKey, "notHelpfulIncorrect"]]
.groupby(c.raterParticipantIdKey)
.agg("sum")
)
user_nh_rating_count = (
notHelpfulTaggedRatings[[c.raterParticipantIdKey, c.noteIdKey]]
nhTagRatings[[c.raterParticipantIdKey, c.noteIdKey]]
.groupby(c.raterParticipantIdKey)
.agg("count")
)
user_nh_rating_count.rename(columns={c.noteIdKey: "cnt"}, inplace=True)
user_totals = user_incorrect.merge(user_nh_rating_count, on=c.raterParticipantIdKey)

note_totals = (
notHelpfulTaggedRatings[[c.raterParticipantIdKey, c.noteIdKey]]
.groupby(c.noteIdKey)
.agg("count")
.reset_index()
)
note_totals.rename(columns={c.raterParticipantIdKey: "num_voters"}, inplace=True)
return user_totals, note_totals
return user_totals


def _get_incorrect_tfidf_ratio(
augmented_ratings: pd.DataFrame, note_nh_count: pd.DataFrame, user_filter: bool, suffix: str
augmented_ratings: pd.DataFrame, user_filter: bool, suffix: str
) -> pd.DataFrame:
"""Computes empirical p(incorrect | note) / p(incorrect | raters over all notes) subject to rater-note inclusion function.
Args:
augmented_ratings: ratings DF with note and rater factors and user incorrect TF
note_nh_count: DF with total number of NH votes per note
filter: inclusion criteria for "incorrect" voters
suffix: suffix for incorrect and count column names for this filter
Expand All @@ -59,6 +50,14 @@ def _get_incorrect_tfidf_ratio(

ratings_w_user_totals = augmented_ratings[user_filter]

note_nh_count = (
ratings_w_user_totals[[c.raterParticipantIdKey, c.noteIdKey]]
.groupby(c.noteIdKey)
.agg("count")
.reset_index()
)
note_nh_count.rename(columns={c.raterParticipantIdKey: "num_voters"}, inplace=True)

columns_to_attempt_to_drop = [
c.internalRaterFactor1Key,
c.internalNoteFactor1Key,
Expand All @@ -67,15 +66,17 @@ def _get_incorrect_tfidf_ratio(
columns_to_drop = ratings_w_user_totals.columns.intersection(columns_to_attempt_to_drop)
ratings_w_user_totals.drop(columns_to_drop, inplace=True, axis=1)

ratings_w_user_totals["p_incorrect_user"] = (
ratings_w_user_totals["notHelpfulIncorrect_total"] / ratings_w_user_totals["cnt"]
)

rating_aggs = ratings_w_user_totals.groupby(c.noteIdKey).agg("sum").reset_index()
rating_aggs_w_cnt = rating_aggs.merge(note_nh_count, on=c.noteIdKey)

rating_aggs_w_cnt["tf_idf_incorrect"] = (
rating_aggs_w_cnt["notHelpfulIncorrect"] / rating_aggs_w_cnt["num_voters"]
) / np.log(
1 + (rating_aggs_w_cnt["notHelpfulIncorrect_total"] / rating_aggs_w_cnt["cnt"])
rating_aggs_w_cnt["tf_idf_incorrect"] = (rating_aggs_w_cnt["notHelpfulIncorrect"]) / np.log(
1 + (rating_aggs_w_cnt["p_incorrect_user"])
) # p(incorrect over all rater ratings)
rating_aggs_w_cnt.drop("notHelpfulIncorrect_total", inplace=True, axis=1)
rating_aggs_w_cnt.drop(["notHelpfulIncorrect_total", "cnt"], inplace=True, axis=1)
rating_aggs_w_cnt.columns = [c.noteIdKey] + [
f"{col}{suffix}" for col in rating_aggs_w_cnt.columns[1:]
]
Expand All @@ -97,12 +98,14 @@ def get_incorrect_aggregates(
aggregates for the Not-Helpful tags, including raw totals, totals adjusted based on the
distance between the rater and the note and ratios based on the adjusted weight totals.
"""
# consider only ratings with some NH tag
notHelpfulTaggedRatings = ratings.loc[ratings[c.notHelpfulTagsTSVOrder].sum(axis=1) > 0]

# get per user incorrect term frequency
user_totals, note_totals = _get_user_incorrect_ratio(ratings)
user_totals = _get_user_incorrect_ratio(notHelpfulTaggedRatings)
# add user and note factors
ratings_w_user_totals = (
ratings[[c.raterParticipantIdKey, c.noteIdKey, "notHelpfulIncorrect"]]
notHelpfulTaggedRatings[[c.raterParticipantIdKey, c.noteIdKey, "notHelpfulIncorrect"]]
.merge(user_totals, on=c.raterParticipantIdKey, suffixes=(None, "_total"))
.merge(noteParams[[c.noteIdKey, c.internalNoteFactor1Key]], on=c.noteIdKey)
.merge(
Expand All @@ -112,24 +115,14 @@ def get_incorrect_aggregates(

interval_filter = (
np.abs(
ratings_w_user_totals[c.internalRaterFactor1Key]
- ratings_w_user_totals[c.internalNoteFactor1Key]
ratings_w_user_totals[c.internalRaterFactor1Key].clip(-0.4, 0.4)
- ratings_w_user_totals[c.internalNoteFactor1Key].clip(-0.4, 0.4)
)
< c.intervalHalfWidth
)
interval_scores = _get_incorrect_tfidf_ratio(
ratings_w_user_totals, note_totals, interval_filter, "_interval"
)
same_factor_filter = (
(ratings_w_user_totals[c.internalRaterFactor1Key] > 0)
& (ratings_w_user_totals[c.internalNoteFactor1Key] > 0)
) | (
(ratings_w_user_totals[c.internalRaterFactor1Key] < 0)
& (ratings_w_user_totals[c.internalNoteFactor1Key] < 0)
)
same_scores = _get_incorrect_tfidf_ratio(
ratings_w_user_totals, note_totals, same_factor_filter, "_same"

incorrectAggregates = _get_incorrect_tfidf_ratio(
ratings_w_user_totals, interval_filter, "_interval"
)

incorrectAggregates = interval_scores.merge(same_scores, on=c.noteIdKey)
return incorrectAggregates
12 changes: 12 additions & 0 deletions sourcecode/scoring/matrix_factorization/matrix_factorization.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ def _initialize_parameters(
globalInterceptInit: Optional[float] = None,
) -> None:
"""Overwrite the parameters of the model with the given initializations.
Set parameters to 0.0 if they were not set in the passed initializations.
Args:
mf_model (BiasedMatrixFactorization)
Expand All @@ -135,9 +136,14 @@ def _initialize_parameters(
if self._logging:
print("initializing notes")
noteInit = self.noteIdMap.merge(noteInit, on=c.noteIdKey, how="left")

noteInit[c.internalNoteInterceptKey].fillna(0.0, inplace=True)
self.mf_model.note_intercepts.weight.data = torch.tensor(
np.expand_dims(noteInit[c.internalNoteInterceptKey].astype(np.float32).values, axis=1)
)

for i in range(1, self._numFactors + 1):
noteInit[c.note_factor_key(i)].fillna(0.0, inplace=True)
self.mf_model.note_factors.weight.data = torch.tensor(
noteInit[[c.note_factor_key(i) for i in range(1, self._numFactors + 1)]]
.astype(np.float32)
Expand All @@ -148,9 +154,14 @@ def _initialize_parameters(
if self._logging:
print("initializing users")
userInit = self.raterIdMap.merge(userInit, on=c.raterParticipantIdKey, how="left")

userInit[c.internalRaterInterceptKey].fillna(0.0, inplace=True)
self.mf_model.user_intercepts.weight.data = torch.tensor(
np.expand_dims(userInit[c.internalRaterInterceptKey].astype(np.float32).values, axis=1)
)

for i in range(1, self._numFactors + 1):
userInit[c.rater_factor_key(i)].fillna(0.0, inplace=True)
self.mf_model.user_factors.weight.data = torch.tensor(
userInit[[c.rater_factor_key(i) for i in range(1, self._numFactors + 1)]]
.astype(np.float32)
Expand Down Expand Up @@ -412,6 +423,7 @@ def run_mf(
globalIntercept: learned global intercept parameter
"""
self._initialize_note_and_rater_id_maps(ratings)

self._create_mf_model(noteInit, userInit, globalInterceptInit)
assert self.mf_model is not None

Expand Down
Loading

0 comments on commit 392bb23

Please sign in to comment.