From 167be8319bcc86a4767c7e038e348c95ef8cf2c9 Mon Sep 17 00:00:00 2001 From: Jay Baxter Date: Wed, 21 Aug 2024 09:21:03 -0700 Subject: [PATCH] New NSH columns and cleanup -Use logger -Add new note status history columns (timestampMinuteOfFinalScoringOutput, timestampMillisOfFirstNmrDueToMinStableCrhTime) -Only apply 1 multi-group model --- sourcecode/scoring/constants.py | 13 +- sourcecode/scoring/contributor_state.py | 57 +++--- sourcecode/scoring/helpfulness_scores.py | 26 +-- .../matrix_factorization.py | 71 ++++---- .../scoring/matrix_factorization/model.py | 13 +- .../matrix_factorization/pseudo_raters.py | 15 +- sourcecode/scoring/mf_base_scorer.py | 31 ++-- sourcecode/scoring/note_ratings.py | 29 ++-- sourcecode/scoring/note_status_history.py | 23 ++- .../scoring/post_selection_similarity.py | 9 +- sourcecode/scoring/process_data.py | 85 ++++----- .../diligence_model.py | 9 +- .../helpfulness_model.py | 9 +- .../reputation_matrix_factorization.py | 41 +++-- sourcecode/scoring/reputation_scorer.py | 11 +- sourcecode/scoring/run_scoring.py | 162 +++++++++--------- sourcecode/scoring/runner.py | 9 +- sourcecode/scoring/scorer.py | 28 +-- sourcecode/scoring/scoring_rules.py | 34 ++-- sourcecode/scoring/tag_consensus.py | 21 ++- sourcecode/scoring/tag_filter.py | 7 +- sourcecode/scoring/topic_model.py | 21 ++- 22 files changed, 421 insertions(+), 303 deletions(-) diff --git a/sourcecode/scoring/constants.py b/sourcecode/scoring/constants.py index 214344a6..cd770bc3 100644 --- a/sourcecode/scoring/constants.py +++ b/sourcecode/scoring/constants.py @@ -1,6 +1,7 @@ from contextlib import contextmanager from dataclasses import dataclass from enum import Enum +import logging import os import time from typing import Dict, Optional, Set @@ -9,6 +10,10 @@ import pandas as pd +logger = logging.getLogger("birdwatch.constants") +logger.setLevel(logging.INFO) + + # Default number of threads to use in torch if os.cpu_count() is unavailable # and no value is specified. defaultNumThreads = os.cpu_count() or 8 @@ -461,6 +466,8 @@ def rater_factor_key(i): updatedTimestampMillisOfNmrDueToMinStableCrhTimeKey = ( "updatedTimestampMillisOfNmrDueToMinStableCrhTime" ) +timestampMinuteOfFinalScoringOutput = "timestampMinuteOfFinalScoringOutput" +timestampMillisOfFirstNmrDueToMinStableCrhTimeKey = "timestampMillisOfFirstNmrDueToMinStableCrhTime" noteStatusHistoryTSVColumnsAndTypes = [ (noteIdKey, np.int64), @@ -484,6 +491,8 @@ def rater_factor_key(i): (timestampMillisOfNmrDueToMinStableCrhTimeKey, np.double), # double because nullable. (currentMultiGroupStatusKey, "category"), (currentModelingMultiGroupKey, np.double), # TODO: int + (timestampMinuteOfFinalScoringOutput, np.double), # double because nullable. + (timestampMillisOfFirstNmrDueToMinStableCrhTimeKey, np.double), # double because nullable. ] noteStatusHistoryTSVColumns = [col for (col, dtype) in noteStatusHistoryTSVColumnsAndTypes] noteStatusHistoryTSVTypes = [dtype for (col, dtype) in noteStatusHistoryTSVColumnsAndTypes] @@ -818,8 +827,6 @@ def rater_factor_key(i): inputPathsTSVColumns = [col for (col, _) in inputPathsTSVColumnsAndTypes] inputPathsTSVTypeMapping = {col: dtype for (col, dtype) in inputPathsTSVColumnsAndTypes} -timestampMinuteOfFinalScoringOutput = "timestampMinuteOfFinalScoringOutput" - @contextmanager def time_block(label): @@ -828,7 +835,7 @@ def time_block(label): yield finally: end = time.time() - print(f"{label} elapsed time: {end - start:.2f} secs ({((end - start) / 60.0):.2f} mins)") + logger.info(f"{label} elapsed time: {end - start:.2f} secs ({((end - start) / 60.0):.2f} mins)") ### TODO: weave through second round intercept. diff --git a/sourcecode/scoring/contributor_state.py b/sourcecode/scoring/contributor_state.py index d2c17100..be8e2841 100644 --- a/sourcecode/scoring/contributor_state.py +++ b/sourcecode/scoring/contributor_state.py @@ -1,3 +1,5 @@ +import logging + from . import constants as c, explanation_tags from .helpfulness_scores import author_helpfulness from .note_ratings import get_ratings_with_scores, get_valid_ratings @@ -5,6 +7,10 @@ import pandas as pd +logger = logging.getLogger("birdwatch.contributor_state") +logger.setLevel(logging.INFO) + + def should_earn_in(contributorScoresWithEnrollment: pd.DataFrame): """ The participant should earn in when they are in the earnedOutAcknowledged, earnedoutNoAck and newUser state. @@ -124,21 +130,21 @@ def _get_rated_after_decision( assert ( len(ratingInfos) == len(ratings) ), f"assigning a status timestamp shouldn't decrease number of ratings: {len(ratingInfos)} vs. {len(ratings)}" - print("Calculating ratedAfterDecision:") - print(f" Total ratings: {len(ratingInfos)}") + logger.info("Calculating ratedAfterDecision:") + logger.info(f" Total ratings: {len(ratingInfos)}") ratingInfos = ratingInfos[~pd.isna(ratingInfos[c.timestampMillisOfNoteMostRecentNonNMRLabelKey])] - print(f" Total ratings on notes with status: {len(ratingInfos)}") + logger.info(f" Total ratings on notes with status: {len(ratingInfos)}") ratingInfos = ratingInfos[ ratingInfos[c.createdAtMillisKey] > ratingInfos[c.timestampMillisOfNoteMostRecentNonNMRLabelKey] ] - print(f" Total ratings after status: {len(ratingInfos)}") + logger.info(f" Total ratings after status: {len(ratingInfos)}") ratingInfos[c.ratedAfterDecision] = 1 ratedAfterDecision = ( ratingInfos[[c.raterParticipantIdKey, c.ratedAfterDecision]] .groupby(c.raterParticipantIdKey) .sum() ) - print(f" Total raters rating after decision: {len(ratedAfterDecision)}") + logger.info(f" Total raters rating after decision: {len(ratedAfterDecision)}") return ratedAfterDecision @@ -421,7 +427,7 @@ def get_contributor_state( ratings: pd.DataFrame, noteStatusHistory: pd.DataFrame, userEnrollment: pd.DataFrame, - logging: bool = True, + log: bool = True, ) -> pd.DataFrame: """ Given scored notes, ratings, note status history, the current user enrollment state, this @@ -433,7 +439,7 @@ def get_contributor_state( ratings (pd.DataFrame): all ratings noteStatusHistory (pd.DataFrame): history of note statuses userEnrollment (pd.DataFrame): User enrollment for BW participants. - logging (bool): Should we log + log (bool): Should we log Returns: pd.DataFrame: contributorScoresWithEnrollment The contributor scores with enrollments """ @@ -582,27 +588,22 @@ def get_contributor_state( # users that do not have an id. contributorScoresWithEnrollment.dropna(subset=[c.raterParticipantIdKey], inplace=True) - if logging: - print("Enrollment State") - print( - "Number of Earned In", - len(contributorScoresWithEnrollment[contributorScoresWithEnrollment[c.enrollmentState] == 0]), + if log: + logger.info("Enrollment State") + logger.info( + f"Number of Earned In {len(contributorScoresWithEnrollment[contributorScoresWithEnrollment[c.enrollmentState] == 0])}" ) - print( - "Number At Risk", - len(contributorScoresWithEnrollment[contributorScoresWithEnrollment[c.enrollmentState] == 1]), + logger.info( + f"Number At Risk {len(contributorScoresWithEnrollment[contributorScoresWithEnrollment[c.enrollmentState] == 1])}" ) - print( - "Number of Earn Out No Ack", - len(contributorScoresWithEnrollment[contributorScoresWithEnrollment[c.enrollmentState] == 2]), + logger.info( + f"Number of Earn Out No Ack {len(contributorScoresWithEnrollment[contributorScoresWithEnrollment[c.enrollmentState] == 2])}" ) - print( - "Number of Earned Out Ack", - len(contributorScoresWithEnrollment[contributorScoresWithEnrollment[c.enrollmentState] == 3]), + logger.info( + f"Number of Earned Out Ack {len(contributorScoresWithEnrollment[contributorScoresWithEnrollment[c.enrollmentState] == 3])}" ) - print( - "Number of New Users", - len(contributorScoresWithEnrollment[contributorScoresWithEnrollment[c.enrollmentState] == 4]), + logger.info( + f"Number of New Users {len(contributorScoresWithEnrollment[contributorScoresWithEnrollment[c.enrollmentState] == 4])}" ) return contributorScoresWithEnrollment, mappedUserEnrollment @@ -615,7 +616,7 @@ def get_contributor_scores( lastNNotes=-1, countNMRNotesLast: bool = False, sinceLastEarnOut: bool = False, - logging: bool = True, + log: bool = True, ) -> pd.DataFrame: """ Given the outputs of the MF model, this function aggregates stats over notes and ratings. The @@ -628,7 +629,7 @@ def get_contributor_scores( lastNNotes (int): count over the last n notes countNMRNotesLast (bool): count NMR notes last. Useful when you want to calculate over a limited set of CRH + CRNH notes sinceLastEarnOut: only count notes since last Earn Out event - logging (bool): Should we log? + log (bool): Should we log? Returns: pd.DataFrame: contributorScores - rating + note aggregates per contributor. """ @@ -676,7 +677,7 @@ def get_contributor_scores( ] ) - if logging: - print("Number Contributor Counts: ", len(contributorCounts)) + if log: + logger.info(f"Number Contributor Counts: {len(contributorCounts)}") return contributorCounts diff --git a/sourcecode/scoring/helpfulness_scores.py b/sourcecode/scoring/helpfulness_scores.py index 0a907e36..d909a818 100644 --- a/sourcecode/scoring/helpfulness_scores.py +++ b/sourcecode/scoring/helpfulness_scores.py @@ -1,3 +1,4 @@ +import logging from typing import Optional from . import constants as c @@ -6,6 +7,10 @@ import pandas as pd +logger = logging.getLogger("birdwatch.helpfulness_scores") +logger.setLevel(logging.INFO) + + def author_helpfulness( scoredNotes: pd.DataFrame, noteInterceptKey: str, @@ -199,7 +204,7 @@ def compute_general_helpfulness_scores( def filter_ratings_by_helpfulness_scores( ratingsForTraining: pd.DataFrame, helpfulnessScores: pd.DataFrame, - logging: bool = True, + log: bool = True, ): """Filter out ratings from raters whose helpfulness scores are too low. See https://twitter.github.io/communitynotes/contributor-scores/#filtering-ratings-based-on-helpfulness-scores. @@ -207,7 +212,7 @@ def filter_ratings_by_helpfulness_scores( Args: ratingsForTraining pandas.DataFrame: unfiltered input ratings helpfulnessScores pandas.DataFrame: helpfulness scores to use to determine which raters to filter out. - logging (bool, optional): debug output. Defaults to True. + log (bool, optional): debug output. Defaults to True. Returns: filtered_ratings pandas.DataFrame: same schema as input ratings, but filtered. @@ -219,15 +224,14 @@ def filter_ratings_by_helpfulness_scores( ratingsForTraining, on=c.raterParticipantIdKey ) - if logging: - print("Unique Raters: ", len(np.unique(ratingsForTraining[c.raterParticipantIdKey]))) - print("People (Authors or Raters) With Helpfulness Scores: ", len(helpfulnessScores)) - print("Raters Included Based on Helpfulness Scores: ", len(includedUsers)) - print( - "Included Raters who have rated at least 1 note in the final dataset: ", - len(np.unique(ratingsHelpfulnessScoreFiltered[c.raterParticipantIdKey])), + if log: + logger.info(f"Unique Raters: {len(np.unique(ratingsForTraining[c.raterParticipantIdKey]))}") + logger.info(f"People (Authors or Raters) With Helpfulness Scores: {len(helpfulnessScores)}") + logger.info(f"Raters Included Based on Helpfulness Scores: {len(includedUsers)}") + logger.info( + f"Included Raters who have rated at least 1 note in the final dataset: {len(np.unique(ratingsHelpfulnessScoreFiltered[c.raterParticipantIdKey]))}", ) - print("Number of Ratings Used For 1st Training: ", len(ratingsForTraining)) - print("Number of Ratings for Final Training: ", len(ratingsHelpfulnessScoreFiltered)) + logger.info(f"Number of Ratings Used For 1st Training: {len(ratingsForTraining)}") + logger.info(f"Number of Ratings for Final Training: {len(ratingsHelpfulnessScoreFiltered)}") return ratingsHelpfulnessScoreFiltered diff --git a/sourcecode/scoring/matrix_factorization/matrix_factorization.py b/sourcecode/scoring/matrix_factorization/matrix_factorization.py index b57e8567..e512bdde 100644 --- a/sourcecode/scoring/matrix_factorization/matrix_factorization.py +++ b/sourcecode/scoring/matrix_factorization/matrix_factorization.py @@ -1,4 +1,5 @@ import dataclasses +import logging from typing import List, Optional, Tuple from .. import constants as c @@ -10,6 +11,10 @@ import torch +logger = logging.getLogger("birdwatch.matrix_factorization") +logger.setLevel(logging.INFO) + + @dataclasses.dataclass class Constants: noteIndexKey = "noteIndex" @@ -24,7 +29,7 @@ def __init__( convergence=1e-7, numFactors=1, useGlobalIntercept=True, - logging=True, + log=True, flipFactorsForIdentification=True, model: Optional[BiasedMatrixFactorization] = None, featureCols: List[str] = [c.noteIdKey, c.raterParticipantIdKey], @@ -45,7 +50,7 @@ def __init__( self._convergence = convergence self._numFactors = numFactors self._useGlobalIntercept = useGlobalIntercept - self._logging = logging + self._log = log self._flipFactorsForIdentification = flipFactorsForIdentification self._featureCols = featureCols self._labelCol = labelCol @@ -62,14 +67,14 @@ def __init__( if self._useSigmoidCrossEntropy: if self._posWeight: - if logging: - print(f"Using pos weight: {self._posWeight} with BCEWithLogitsLoss") + if log: + logger.info(f"Using pos weight: {self._posWeight} with BCEWithLogitsLoss") self.criterion = torch.nn.BCEWithLogitsLoss( pos_weight=torch.FloatTensor(np.array(self._posWeight)), reduction="none" ) else: - if logging: - print("Using BCEWithLogitsLoss") + if log: + logger.info("Using BCEWithLogitsLoss") self.criterion = torch.nn.BCEWithLogitsLoss(reduction="none") else: if self._posWeight: @@ -97,7 +102,7 @@ def get_new_mf_with_same_args(self): convergence=self._convergence, numFactors=self._numFactors, useGlobalIntercept=self._useGlobalIntercept, - logging=self._logging, + log=self._log, flipFactorsForIdentification=self._flipFactorsForIdentification, model=None, featureCols=self._featureCols, @@ -173,8 +178,8 @@ def _initialize_parameters( """ assert self.mf_model is not None if noteInit is not None: - if self._logging: - print("initializing notes") + if self._log: + logger.info("initializing notes") noteInit = self.noteIdMap.merge( noteInit, on=c.noteIdKey, @@ -196,8 +201,8 @@ def _initialize_parameters( ) if userInit is not None: - if self._logging: - print("initializing users") + if self._log: + logger.info("initializing users") userInit = self.raterIdMap.merge(userInit, on=c.raterParticipantIdKey, how="left") userInit[c.internalRaterInterceptKey].fillna(0.0, inplace=True) @@ -214,8 +219,8 @@ def _initialize_parameters( ) if globalInterceptInit is not None: - if self._logging: - print("initialized global intercept") + if self._log: + logger.info("initialized global intercept") self.mf_model.global_intercept = torch.nn.parameter.Parameter( torch.ones(1, 1, dtype=torch.float32) * globalInterceptInit ) @@ -268,15 +273,15 @@ def _create_mf_model( self._initialize_parameters(noteInit, userInit, globalInterceptInit) if (noteInit is not None) and (userInit is not None): - print(f"learning rate set to :{self._initLearningRate}") + logger.info(f"learning rate set to :{self._initLearningRate}") self.optimizer = torch.optim.Adam( self.mf_model.parameters(), lr=self._initLearningRate ) # smaller learning rate else: - print(f"learning rate set to :{self._noInitLearningRate}") + logger.info(f"learning rate set to :{self._noInitLearningRate}") self.optimizer = torch.optim.Adam(self.mf_model.parameters(), lr=self._noInitLearningRate) - if self._logging: - print(self.mf_model.device) + if self._log: + logger.info(f"{self.mf_model.device}") self.mf_model.to(self.mf_model.device) def _instantiate_biased_mf_model(self): @@ -287,11 +292,11 @@ def _instantiate_biased_mf_model(self): n_notes, use_global_intercept=self._useGlobalIntercept, n_factors=self._numFactors, - logging=self._logging, + log=self._log, ) - if self._logging: - print("------------------") - print(f"Users: {n_users}, Notes: {n_notes}") + if self._log: + logger.info("------------------") + logger.info(f"Users: {n_users}, Notes: {n_notes}") def _compute_and_print_loss( self, @@ -311,11 +316,11 @@ def _compute_and_print_loss( else: validate_loss_value = None - if self._logging: - print("epoch", epoch, loss_value) - print("TRAIN FIT LOSS: ", train_loss_value) + if self._log: + logger.info(f"epoch {epoch} {loss_value}") + logger.info(f"TRAIN FIT LOSS: {train_loss_value}") if validate_loss_value is not None: - print("VALIDATE FIT LOSS: ", validate_loss_value) + logger.info(f"VALIDATE FIT LOSS: {validate_loss_value}") if final == True: self.test_errors.append(loss_value) @@ -458,8 +463,8 @@ def _fit_model( epoch += 1 - if self._logging: - print("Num epochs:", epoch) + if self._log: + logger.info("Num epochs: {epoch}") return self._compute_and_print_loss(loss.item(), epoch, final=True) def prepare_features_and_labels( @@ -522,18 +527,18 @@ def run_mf( self._create_mf_model(noteInit, userInit, globalInterceptInit) assert self.mf_model is not None - print( + logger.info( f"Ratings per note in dataset: {len(ratings)/self.mf_model.note_factors.weight.data.shape[0]}" ) - print( + logger.info( f"Ratings per user in dataset: {len(ratings)/self.mf_model.user_factors.weight.data.shape[0]}" ) if ratingPerNoteLossRatio is not None: - print( + logger.info( f"Correcting loss function to simulate rating per note loss ratio = {ratingPerNoteLossRatio}" ) if ratingPerUserLossRatio is not None: - print( + logger.info( f"Correcting loss function to simulate rating per user loss ratio = {ratingPerUserLossRatio}" ) @@ -567,8 +572,8 @@ def run_mf( globalIntercept = None if self._useGlobalIntercept: globalIntercept = self.mf_model.global_intercept.item() - if self._logging: - print("Global Intercept: ", globalIntercept) + if self._log: + logger.info(f"Global Intercept: {globalIntercept}") fitNoteParams, fitRaterParams = self._get_parameters_from_trained_model() diff --git a/sourcecode/scoring/matrix_factorization/model.py b/sourcecode/scoring/matrix_factorization/model.py index 0a369547..b67c9ad1 100644 --- a/sourcecode/scoring/matrix_factorization/model.py +++ b/sourcecode/scoring/matrix_factorization/model.py @@ -1,9 +1,14 @@ from dataclasses import dataclass +import logging from typing import Optional import torch +logger = logging.getLogger("birdwatch.model") +logger.setLevel(logging.INFO) + + @dataclass class ModelData: rating_labels: Optional[torch.FloatTensor] @@ -20,7 +25,7 @@ def __init__( n_notes: int, n_factors: int = 1, use_global_intercept: bool = True, - logging: bool = True, + log: bool = True, ) -> None: """Initialize matrix factorization model using xavier_uniform for factors and zeros for intercepts. @@ -33,7 +38,7 @@ def __init__( """ super().__init__() - self._logging = logging + self._log = log self.user_factors = torch.nn.Embedding(n_users, n_factors, sparse=False, dtype=torch.float32) self.note_factors = torch.nn.Embedding(n_notes, n_factors, sparse=False, dtype=torch.float32) @@ -81,6 +86,6 @@ def _freeze_parameters(self, words_to_freeze: set): for name, param in self.named_parameters(): for word in words_to_freeze: if word in name: - if self._logging: - print("Freezing parameter: ", name) + if self._log: + logger.info(f"Freezing parameter: {name}") param.requires_grad_(False) diff --git a/sourcecode/scoring/matrix_factorization/pseudo_raters.py b/sourcecode/scoring/matrix_factorization/pseudo_raters.py index 5ea036a4..d7b29f5d 100644 --- a/sourcecode/scoring/matrix_factorization/pseudo_raters.py +++ b/sourcecode/scoring/matrix_factorization/pseudo_raters.py @@ -1,4 +1,5 @@ from dataclasses import dataclass +import logging from .. import constants as c from .matrix_factorization import Constants as mf_c, MatrixFactorization @@ -8,6 +9,10 @@ import torch +logger = logging.getLogger("birdwatch.pseudo_raters") +logger.setLevel(logging.INFO) + + @dataclass class Constants: extraRaterInterceptKey = "extraRaterIntercept" @@ -28,10 +33,10 @@ def __init__( raterParams: pd.DataFrame, globalBias: float, mfRanker: MatrixFactorization, - logging=True, + log=True, checkParamsSame=True, ): - self._logging = logging + self._log = log self._mfRanker = mfRanker self._checkParamsSame = checkParamsSame self.ratings = ratings @@ -258,9 +263,9 @@ def _fit_note_params_for_each_dataset_with_extreme_ratings(self): self._create_dataset_with_extreme_rating_on_each_note(ratingToAddWithoutNoteId) ) - if self._logging: - print("------------------") - print(f"Re-scoring all notes with extra rating added: {ratingToAddWithoutNoteId}") + if self._log: + logger.info("------------------") + logger.info(f"Re-scoring all notes with extra rating added: {ratingToAddWithoutNoteId}") with c.time_block("Pseudo: fit all notes with raters constant"): fitNoteParams = self._fit_all_notes_with_raters_constant( diff --git a/sourcecode/scoring/mf_base_scorer.py b/sourcecode/scoring/mf_base_scorer.py index 840ce316..7860e49b 100644 --- a/sourcecode/scoring/mf_base_scorer.py +++ b/sourcecode/scoring/mf_base_scorer.py @@ -1,4 +1,5 @@ import gc +import logging from typing import Dict, List, Optional, Set, Tuple from . import ( @@ -24,6 +25,10 @@ import torch +logger = logging.getLogger("birdwatch.mf_base_scorer") +logger.setLevel(logging.INFO) + + def coalesce_columns(df: pd.DataFrame, columnPrefix: str) -> pd.DataFrame: """Condense all columns beginning with columnPrefix into a single column. @@ -503,7 +508,7 @@ def _prescore_notes_and_users( helpfulnessScores (pd.DataFrame) """ if self._seed is not None: - print(f"seeding with {self._seed}") + logger.info(f"seeding with {self._seed}") torch.manual_seed(self._seed) # Removes ratings where either (1) the note did not receive enough ratings, or @@ -551,7 +556,7 @@ def _prescore_notes_and_users( # in situations where the overall volume of ratings is lower (e.g. topic models). if not self._useReputation: assert "Topic" in self.get_name(), f"Unexpected scorer: {self.get_name()}" - print(f"Skipping rep-filtering in prescoring for {self.get_name()}") + logger.info(f"Skipping rep-filtering in prescoring for {self.get_name()}") helpfulnessScores = raterParamsUnfiltered[[c.raterParticipantIdKey]] helpfulnessScores[ [ @@ -568,7 +573,7 @@ def _prescore_notes_and_users( finalRoundRatings = ratingsForTraining else: assert "Topic" not in self.get_name(), f"Unexpected scorer: {self.get_name()}" - print(f"Performing rep-filtering for {self.get_name()}") + logger.info(f"Performing rep-filtering for {self.get_name()}") # Get a dataframe of scored notes based on the algorithm results above with self.time_block("Compute scored notes"): scoredNotes = note_ratings.compute_scored_notes( @@ -763,7 +768,7 @@ def _prescore_notes_and_users( raterParamsDiligenceInit = raterParams[ [c.raterParticipantIdKey, c.internalRaterFactor1Key] ].rename({c.internalRaterFactor1Key: c.lowDiligenceRaterFactor1Key}, axis=1) - print( + logger.info( f"In {self.get_name()} prescoring, about to call diligence with {len(finalRoundRatings)} final round ratings." ) ( @@ -921,7 +926,7 @@ def _score_notes_and_users( userScores pd.DataFrame: one row per user containing a column for each helpfulness score. """ if self._seed is not None: - print(f"seeding with {self._seed}") + logger.info(f"seeding with {self._seed}") torch.manual_seed(self._seed) # Removes ratings where either the note did not receive enough ratings @@ -948,7 +953,7 @@ def _score_notes_and_users( assert ( "Topic" in self.get_name() ), f"Unexpected scorer has reputation filtering disabled: {self.get_name()}" - print(f"Skipping rep-filtering in 2nd phase for {self.get_name()}") + logger.info(f"Skipping rep-filtering in 2nd phase for {self.get_name()}") finalRoundRatings = ratingsForTraining else: finalRoundRatings = helpfulness_scores.filter_ratings_by_helpfulness_scores( @@ -1003,7 +1008,7 @@ def _score_notes_and_users( # Add low diligence intercepts. with self.time_block("Low Diligence Reputation Model"): - print( + logger.info( f"In {self.get_name()} final scoring, about to call diligence with {len(finalRoundRatings)} final round ratings." ) assert ( @@ -1019,9 +1024,9 @@ def _score_notes_and_users( ratingsPerUserLossRatio=prescoringMetaScorerOutput.finalRoundNumRatings / prescoringMetaScorerOutput.finalRoundNumUsers, ) - print(f"diligenceNP cols: {diligenceNoteParams.columns}") + logger.info(f"diligenceNP cols: {diligenceNoteParams.columns}") noteParams = noteParams.merge(diligenceNoteParams, on=c.noteIdKey) - print(f"np cols: {noteParams.columns}") + logger.info(f"np cols: {noteParams.columns}") if self._saveIntermediateState: self.noteParams = noteParams @@ -1038,7 +1043,7 @@ def _score_notes_and_users( # Assigns updated CRH / CRNH bits to notes based on volume of prior ratings # and ML output. with self.time_block("Final compute scored notes"): - print(f"About to call compute_scored_notes with {self.get_name()}") + logger.info(f"About to call compute_scored_notes with {self.get_name()}") scoredNotes = note_ratings.compute_scored_notes( ratings, noteParams, @@ -1059,7 +1064,7 @@ def _score_notes_and_users( factorThreshold=self._factorThreshold, firmRejectThreshold=self._firmRejectThreshold, ) - print(f"sn cols: {scoredNotes.columns}") + logger.info(f"sn cols: {scoredNotes.columns}") # Takes raterParams from the MF run, but use the pre-computed # helpfulness scores from prescoringRaterModelOutput. @@ -1093,7 +1098,7 @@ def score_final(self, scoringArgs: c.FinalScoringArgs) -> c.ModelResult: c.scorerNameKey field of those dataframes. """ torch.set_num_threads(self._threads) - print( + logger.info( f"score_final: Torch intra-op parallelism for {self.get_name()} set to: {torch.get_num_threads()}" ) @@ -1110,7 +1115,7 @@ def score_final(self, scoringArgs: c.FinalScoringArgs) -> c.ModelResult: ].drop(columns=c.scorerNameKey, inplace=False) if self.get_name() not in scoringArgs.prescoringMetaOutput.metaScorerOutput: - print( + logger.info( f"Scorer {self.get_name()} not found in prescoringMetaOutput; returning empty scores from final scoring." ) return self._return_empty_final_scores() diff --git a/sourcecode/scoring/note_ratings.py b/sourcecode/scoring/note_ratings.py index cd68db2b..e5930c99 100644 --- a/sourcecode/scoring/note_ratings.py +++ b/sourcecode/scoring/note_ratings.py @@ -1,4 +1,5 @@ from datetime import datetime, timedelta, timezone +import logging from typing import Callable, Dict, Optional from . import constants as c, incorrect_filter, scoring_rules, tag_filter @@ -8,6 +9,10 @@ import pandas as pd +logger = logging.getLogger("birdwatch.note_ratings") +logger.setLevel(logging.INFO) + + # Threshold limiting the number of ratings which can be counted as "valid" for the purpose of # determining rating performance for notes which were created before noteStatusHistory was # introduced. Notice that this value coincides with the minimum number of ratings necessary to @@ -43,7 +48,7 @@ def is_crnh_diamond( def get_ratings_before_note_status_and_public_tsv( ratings: pd.DataFrame, noteStatusHistory: pd.DataFrame, - logging: bool = True, + log: bool = True, doTypeCheck: bool = True, ) -> pd.DataFrame: """Determine which ratings are made before note's most recent non-NMR status, @@ -54,7 +59,7 @@ def get_ratings_before_note_status_and_public_tsv( Args: ratings (pd.DataFrame) noteStatusHistory (pd.DataFrame) - logging (bool, optional). Defaults to True. + log (bool, optional). Defaults to True. doTypeCheck (bool): do asserts to check types. Returns: pd.DataFrame combinedRatingsBeforeStatus ratings that were created early enough to be valid ratings @@ -140,11 +145,11 @@ def get_ratings_before_note_status_and_public_tsv( combinedRatingsBeforeStatus = pd.concat([ratingsBeforeStatusNewNotes, first5RatingsOldNotes]) - if logging: - print( + if log: + logger.info( f"Total ratings: {np.invert(noteCreatedBeforeNoteStatusHistory).sum()} post-tombstones and {(noteCreatedBeforeNoteStatusHistory).sum()} pre-tombstones" ) - print( + logger.info( f"Total ratings created before statuses: {len(combinedRatingsBeforeStatus)}, including {len(ratingsBeforeStatusNewNotes)} post-tombstones and {len(first5RatingsOldNotes)} pre-tombstones." ) @@ -156,7 +161,7 @@ def get_ratings_with_scores( ratings: pd.DataFrame, noteStatusHistory: pd.DataFrame, scoredNotes: pd.DataFrame, - logging: bool = True, + log: bool = True, doTypeCheck: bool = True, ) -> pd.DataFrame: """ @@ -170,7 +175,7 @@ def get_ratings_with_scores( pd.DataFrame: binaryRatingsOnNotesWithStatusLabels Binary ratings with status labels """ ratingsBeforeNoteStatus = get_ratings_before_note_status_and_public_tsv( - ratings, noteStatusHistory, logging, doTypeCheck + ratings, noteStatusHistory, log, doTypeCheck ) ratingsWithScores = ratingsBeforeNoteStatus[ @@ -193,7 +198,7 @@ def get_valid_ratings( ratings: pd.DataFrame, noteStatusHistory: pd.DataFrame, scoredNotes: pd.DataFrame, - logging: bool = True, + log: bool = True, doTypeCheck: bool = True, ) -> pd.DataFrame: """Determine which ratings are "valid" (used to determine rater helpfulness score) @@ -204,13 +209,13 @@ def get_valid_ratings( ratings (pd.DataFrame) noteStatusHistory (pd.DataFrame) scoredNotes (pd.DataFrame) - logging (bool, optional): Defaults to True. + log (bool, optional): Defaults to True. doTypeCheck (bool): do asserts to check types. Returns: pd.DataFrame: binaryRatingsOnNotesWithStatusLabels CRH/CRNH notes group by helpfulness """ ratingsWithScores = get_ratings_with_scores( - ratings, noteStatusHistory, scoredNotes, logging, doTypeCheck + ratings, noteStatusHistory, scoredNotes, log, doTypeCheck ) ratingsWithScores[c.ratingCountKey] = 1 @@ -271,8 +276,8 @@ def get_valid_ratings( helpfulRatingOnCrhNote | notHelpfulRatingOnCrnhNote, c.ratingAgreesWithNoteStatusKey ] = True - if logging: - print(f"Total valid ratings: {len(binaryRatingsOnNotesWithStatusLabels)}") + if log: + logger.info(f"Total valid ratings: {len(binaryRatingsOnNotesWithStatusLabels)}") return binaryRatingsOnNotesWithStatusLabels diff --git a/sourcecode/scoring/note_status_history.py b/sourcecode/scoring/note_status_history.py index b408c6b6..eb305930 100644 --- a/sourcecode/scoring/note_status_history.py +++ b/sourcecode/scoring/note_status_history.py @@ -1,3 +1,4 @@ +import logging import time from typing import Optional @@ -8,6 +9,9 @@ import pandas as pd +logger = logging.getLogger("birdwatch.note_status_history") +logger.setLevel(logging.INFO) + # Delay specifying when to lock note status, currently set to two weeks. _noteLockMillis = 14 * 24 * 60 * 60 * 1000 @@ -36,7 +40,7 @@ def merge_note_info(oldNoteStatusHistory: pd.DataFrame, notes: pd.DataFrame) -> unsafeAllowed={c.createdAtMillisKey, c.createdAtMillisKey + noteSuffix}, ) newNotes = pd.isna(newNoteStatusHistory[c.createdAtMillisKey]) - print(f"total notes added to noteStatusHistory: {sum(newNotes)}") + logger.info(f"total notes added to noteStatusHistory: {sum(newNotes)}") # Copy timestamp and authorship data over for new notes. newNoteStatusHistory.loc[newNotes, c.createdAtMillisKey] = newNoteStatusHistory.loc[ newNotes, c.createdAtMillisKey + noteSuffix @@ -89,10 +93,21 @@ def _update_single_note_status_history(mergedNote, currentTimeMillis, newScoredN Returns: row of pd.DataFrame """ + # This TS will be set by run_combine_scoring_outputs. + mergedNote[c.timestampMinuteOfFinalScoringOutput] = np.nan + + # TODO(jiansongc): remove after new column is in prod. + if c.timestampMillisOfFirstNmrDueToMinStableCrhTimeKey not in mergedNote: + mergedNote[c.timestampMillisOfFirstNmrDueToMinStableCrhTimeKey] = np.nan + if not pd.isna(mergedNote[c.updatedTimestampMillisOfNmrDueToMinStableCrhTimeKey]): mergedNote[c.timestampMillisOfNmrDueToMinStableCrhTimeKey] = mergedNote[ c.updatedTimestampMillisOfNmrDueToMinStableCrhTimeKey ] + if pd.isna(mergedNote[c.timestampMillisOfFirstNmrDueToMinStableCrhTimeKey]): + mergedNote[c.timestampMillisOfFirstNmrDueToMinStableCrhTimeKey] = mergedNote[ + c.timestampMillisOfNmrDueToMinStableCrhTimeKey + ] if mergedNote[c.finalRatingStatusKey] != mergedNote[c.currentLabelKey]: # Changed status vs. previous run: @@ -193,7 +208,7 @@ def check_flips(mergedStatuses: pd.DataFrame, noteSubset: c.NoteSubset) -> None: # Prune notes to unlocked notes. mergedStatuses = mergedStatuses[mergedStatuses[c.timestampMillisOfStatusLockKey].isna()] # Prune to note subset - print( + logger.info( f"Checking Flip Rate for note subset: {noteSubset.description} (unlocked only), with max new CRH churn: {noteSubset.maxNewCrhChurnRate}, and max old CRH churn: {noteSubset.maxOldCrhChurnRate}" ) if noteSubset.noteSet is not None: @@ -222,12 +237,12 @@ def _check_flips( # Validate that changes are within allowable bounds. smoothedNewNoteRatio = (len(newCrhNotes - oldCrhNotes)) / (len(oldCrhNotes) + smoothingCount) rawNewNoteRatio = (len(newCrhNotes - oldCrhNotes)) / len(oldCrhNotes) - print( + logger.info( f"Raw new note ratio: {rawNewNoteRatio}, smoothed new note ratio: {smoothedNewNoteRatio}. (newCrhNotes={len(newCrhNotes)}, oldCrhNotes={len(oldCrhNotes)}, delta={len(newCrhNotes - oldCrhNotes)}" ) smoothedOldNoteRatio = (len(oldCrhNotes - newCrhNotes)) / (len(oldCrhNotes) + smoothingCount) rawOldNoteRatio = (len(oldCrhNotes - newCrhNotes)) / len(oldCrhNotes) - print( + logger.info( f"Raw old note ratio: {rawOldNoteRatio}, smoothed old note ratio: {smoothedOldNoteRatio}. (newCrhNotes={len(newCrhNotes)}, oldCrhNotes={len(oldCrhNotes)}, delta={len(oldCrhNotes - newCrhNotes)}" ) diff --git a/sourcecode/scoring/post_selection_similarity.py b/sourcecode/scoring/post_selection_similarity.py index ca93d20f..3112ce4e 100644 --- a/sourcecode/scoring/post_selection_similarity.py +++ b/sourcecode/scoring/post_selection_similarity.py @@ -1,4 +1,5 @@ import gc +import logging from typing import Dict from . import constants as c @@ -7,6 +8,10 @@ import pandas as pd +logger = logging.getLogger("birdwatch.post_selection_similarity") +logger.setLevel(logging.INFO) + + class PostSelectionSimilarity: def __init__( self, @@ -240,11 +245,11 @@ def _get_pair_counts(ratings: pd.DataFrame, windowMillis: int = 1000 * 60 * 10) values = ratings[ [c.noteIdKey, c.createdAtMillisKey, c.raterParticipantIdKey, c.tweetIdKey] ].values - print(len(values)) + logger.info(len(values)) for i in range(len(values)): priorNote, priorTs, priorRater, priorTweet = values[i] if i == 0 or i == 1000 or i == 100000 or i % 5000000 == 0: - print(f"get_pair_counts i={i}") + logger.info(f"get_pair_counts i={i}") j = i + 1 while j < len(values): nextNote, nextTs, nextRater, nextTweet = values[j] diff --git a/sourcecode/scoring/process_data.py b/sourcecode/scoring/process_data.py index ef0409e8..4aaa963a 100644 --- a/sourcecode/scoring/process_data.py +++ b/sourcecode/scoring/process_data.py @@ -1,5 +1,6 @@ from abc import ABC, abstractmethod from io import StringIO +import logging import os from typing import Dict, List, Optional, Tuple @@ -12,6 +13,10 @@ from sklearn.pipeline import Pipeline +logger = logging.getLogger("birdwatch.process_data") +logger.setLevel(logging.INFO) + + def read_from_strings( notesStr: str, ratingsStr: str, noteStatusHistoryStr: str ) -> Tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame]: @@ -92,9 +97,9 @@ def tsv_parser( usecols=useCols, ) if convertNAToNone: - print("Logging size effect of convertNAToNone") - print("Before conversion:") - print(get_df_info(data)) + logger.info("Logging size effect of convertNAToNone") + logger.info("Before conversion:") + logger.info(get_df_info(data)) # float types will be nan if missing; newer nullable types like "StringDtype" or "Int64Dtype" will by default # be pandas._libs.missing.NAType if missing. Set those to None and change the dtype back to object. for colname, coltype in mapping.items(): @@ -104,8 +109,8 @@ def tsv_parser( ): data[colname] = data[colname].astype(object) data.loc[pd.isna(data[colname]), colname] = None - print("After conversion:") - print(get_df_info(data)) + logger.info("After conversion:") + logger.info(get_df_info(data)) return data except (ValueError, IndexError) as e: raise ValueError(f"Invalid input: {e}") @@ -211,7 +216,7 @@ def read_from_tsv( header=headers, convertNAToNone=False, ) - noteStatusHistory[c.timestampMillisOfNmrDueToMinStableCrhTimeKey] = np.nan + noteStatusHistory[c.timestampMillisOfFirstNmrDueToMinStableCrhTimeKey] = np.nan assert len(noteStatusHistory.columns.values) == len(c.noteStatusHistoryTSVColumns) and all( noteStatusHistory.columns == c.noteStatusHistoryTSVColumns ), ( @@ -243,7 +248,7 @@ def _filter_misleading_notes( notes: pd.DataFrame, ratings: pd.DataFrame, noteStatusHistory: pd.DataFrame, - logging: bool = True, + log: bool = True, ) -> pd.DataFrame: """ This function actually filters ratings (not notes), based on which notes they rate. @@ -256,7 +261,7 @@ def _filter_misleading_notes( notes (pd.DataFrame): _description_ ratings (pd.DataFrame): _description_ noteStatusHistory (pd.DataFrame): _description_ - logging (bool, optional): _description_. Defaults to True. + log (bool, optional): _description_. Defaults to True. Returns: pd.DataFrame: filtered ratings @@ -290,20 +295,20 @@ def _filter_misleading_notes( ratings[c.classificationKey] == c.noteSaysTweetIsNotMisleadingKey ) & (ratings[createdAtMillisNSHKey] > c.notMisleadingUILaunchTime) - if logging: - print( + if log: + logger.info( f"Preprocess Data: Filter misleading notes, starting with {len(ratings)} ratings on {len(np.unique(ratings[c.noteIdKey]))} notes" ) - print( + logger.info( f" Keeping {ratings[notDeletedMisleadingKey].sum()} ratings on {len(np.unique(ratings.loc[ratings[notDeletedMisleadingKey],c.noteIdKey]))} misleading notes" ) - print( + logger.info( f" Keeping {ratings[deletedButInNSHKey].sum()} ratings on {len(np.unique(ratings.loc[ratings[deletedButInNSHKey],c.noteIdKey]))} deleted notes that were previously scored (in note status history)" ) - print( + logger.info( f" Removing {notDeletedNotMisleadingOldUI.sum()} ratings on {len(np.unique(ratings.loc[notDeletedNotMisleadingOldUI, c.noteIdKey]))} older notes that aren't deleted, but are not-misleading." ) - print( + logger.info( f" Removing {deletedNotInNSH.sum()} ratings on {len(np.unique(ratings.loc[deletedNotInNSH, c.noteIdKey]))} notes that were deleted and not in note status history (e.g. old)." ) @@ -368,7 +373,7 @@ def preprocess_data( ratings: pd.DataFrame, noteStatusHistory: pd.DataFrame, shouldFilterNotMisleadingNotes: bool = True, - logging: bool = True, + log: bool = True, ratingsOnly: bool = False, ) -> Tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame]: """Populate helpfulNumKey, a unified column that merges the helpfulness answers from @@ -382,7 +387,7 @@ def preprocess_data( ratings (pd.DataFrame) noteStatusHistory (pd.DataFrame) shouldFilterNotMisleadingNotes (bool, optional): Defaults to True. - logging (bool, optional): Defaults to True. + log (bool, optional): Defaults to True. ratingsOnly (bool, optional): Defaults to False Returns: @@ -390,15 +395,13 @@ def preprocess_data( ratings (pd.DataFrame) noteStatusHistory (pd.DataFrame) """ - if logging: - print( - "Timestamp of latest rating in data: ", - pd.to_datetime(ratings[c.createdAtMillisKey], unit="ms").max(), + if log: + logger.info( + f"Timestamp of latest rating in data: {pd.to_datetime(ratings[c.createdAtMillisKey], unit='ms').max()}", ) if not ratingsOnly: - print( - "Timestamp of latest note in data: ", - pd.to_datetime(notes[c.createdAtMillisKey], unit="ms").max(), + logger.info( + f"Timestamp of latest note in data: {pd.to_datetime(notes[c.createdAtMillisKey], unit='ms').max()}", ) ratings = remove_duplicate_ratings(ratings) @@ -421,10 +424,10 @@ def preprocess_data( noteStatusHistory = note_status_history.merge_note_info(noteStatusHistory, notes) if shouldFilterNotMisleadingNotes: - ratings = _filter_misleading_notes(notes, ratings, noteStatusHistory, logging) + ratings = _filter_misleading_notes(notes, ratings, noteStatusHistory, log) - if logging: - print( + if log: + logger.info( "Num Ratings: %d, Num Unique Notes Rated: %d, Num Unique Raters: %d" % ( len(ratings), @@ -439,7 +442,7 @@ def filter_ratings( ratings: pd.DataFrame, minNumRatingsPerRater: int, minNumRatersPerNote: int, - logging: bool = True, + log: bool = True, ) -> pd.DataFrame: """Apply min number of ratings for raters & notes. Instead of iterating these filters until convergence, simply stop after going back and force once. @@ -450,7 +453,7 @@ def filter_ratings( included in scoring. Raters with fewer ratings are removed. minNumRatersPerNote: Minimum number of ratings which a note must have to be included in scoring. Notes with fewer ratings are removed. - logging: Debug output. Defaults to True. + log: Debug output. Defaults to True. Returns: pd.DataFrame: filtered ratings @@ -470,11 +473,11 @@ def filter_raters(ratings): ratings = filter_raters(ratings) ratings = filter_notes(ratings) - if logging: + if log: # Log final details unique_notes = ratings[c.noteIdKey].nunique() unique_raters = ratings[c.raterParticipantIdKey].nunique() - print( + logger.info( f"After applying min {minNumRatingsPerRater} ratings per rater and min {minNumRatersPerNote} raters per note: \n" + f"Num Ratings: {len(ratings)}, Num Unique Notes Rated: {unique_notes}, Num Unique Raters: {unique_raters}" ) @@ -574,7 +577,7 @@ def __init__( userEnrollmentPath: str, headers: bool, shouldFilterNotMisleadingNotes: bool = True, - logging: bool = True, + log: bool = True, prescoringNoteModelOutputPath: Optional[str] = None, prescoringRaterModelOutputPath: Optional[str] = None, prescoringNoteTopicClassifierPath: Optional[str] = None, @@ -588,7 +591,7 @@ def __init__( userEnrollmentPath (str): file path headers: If true, expect first row of input files to be headers. shouldFilterNotMisleadingNotes (bool, optional): Throw out not-misleading notes if True. Defaults to True. - logging (bool, optional): Print out debug output. Defaults to True. + log (bool, optional): Print out debug output. Defaults to True. """ self.notesPath = notesPath self.ratingsPath = ratingsPath @@ -600,7 +603,7 @@ def __init__( self.prescoringMetaOutputPath = prescoringMetaOutputPath self.headers = headers self.shouldFilterNotMisleadingNotes = shouldFilterNotMisleadingNotes - self.logging = logging + self.log = log def get_data(self) -> Tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame, pd.DataFrame]: """All-in-one function for reading Birdwatch notes and ratings from TSV files. @@ -617,14 +620,14 @@ def get_data(self) -> Tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame, pd.DataFra self.headers, ) notes, ratings, noteStatusHistory = preprocess_data( - notes, ratings, noteStatusHistory, self.shouldFilterNotMisleadingNotes, self.logging + notes, ratings, noteStatusHistory, self.shouldFilterNotMisleadingNotes, self.log ) return notes, ratings, noteStatusHistory, userEnrollment def get_prescoring_model_output( self, ) -> Tuple[pd.DataFrame, pd.DataFrame, Pipeline, c.PrescoringMetaOutput]: - print( + logger.info( f"Attempting to read prescoring model output from {self.prescoringNoteModelOutputPath}, {self.prescoringRaterModelOutputPath}, {self.prescoringNoteTopicClassifierPath}, {self.prescoringMetaOutputPath}" ) if self.prescoringRaterModelOutputPath is None: @@ -702,7 +705,7 @@ def filter_input_data_for_testing( Returns: notes, ratings, prescoringNotesInput, prescoringRatingsInput """ - print( + logger.info( f"""Called filter_input_data_for_testing. Notes: {len(notes)}, Ratings: {len(ratings)}. Max note createdAt: {pd.to_datetime(notes[c.createdAtMillisKey].max(), unit='ms')}; Max rating createAt: {pd.to_datetime(ratings[c.createdAtMillisKey].max(), unit='ms')}""" ) @@ -710,7 +713,7 @@ def filter_input_data_for_testing( notes, ratings = filter_notes_and_ratings_after_particular_timestamp_millis( notes, ratings, cutoffTimestampMillis ) - print( + logger.info( f"""After filtering notes and ratings after particular timestamp (={cutoffTimestampMillis}). Notes: {len(notes)}, Ratings: {len(ratings)}. Max note createdAt: {pd.to_datetime(notes[c.createdAtMillisKey].max(), unit='ms')}; Max rating createAt: {pd.to_datetime(ratings[c.createdAtMillisKey].max(), unit='ms')}""" ) @@ -721,7 +724,7 @@ def filter_input_data_for_testing( excludeRatingsAfterANoteGotFirstStatusPlusNHours, daysInPastToApplyPostFirstStatusFiltering, ) - print( + logger.info( f"""After filtering ratings after first status (plus {excludeRatingsAfterANoteGotFirstStatusPlusNHours} hours) for notes created in last {daysInPastToApplyPostFirstStatusFiltering} days. Notes: {len(notes)}, Ratings: {len(ratings)}. Max note createdAt: {pd.to_datetime(notes[c.createdAtMillisKey].max(), unit='ms')}; Max rating createAt: {pd.to_datetime(ratings[c.createdAtMillisKey].max(), unit='ms')}""" ) @@ -732,7 +735,7 @@ def filter_input_data_for_testing( ) = filter_prescoring_input_to_simulate_delay_in_hours( notes, ratings, filterPrescoringInputToSimulateDelayInHours ) - print( + logger.info( f"""After filtering prescoring notes and ratings to simulate a delay of {filterPrescoringInputToSimulateDelayInHours} hours: Notes: {len(prescoringNotesInput)}, Ratings: {len(prescoringRatingsInput)}. Max note createdAt: {pd.to_datetime(prescoringNotesInput[c.createdAtMillisKey].max(), unit='ms')}; Max rating createAt: {pd.to_datetime(prescoringRatingsInput[c.createdAtMillisKey].max(), unit='ms')}""" ) @@ -759,7 +762,7 @@ def filter_ratings_after_first_status_plus_n_hours( millisToLookBack = daysInPastToApplyPostFirstStatusFiltering * 24 * 60 * 60 * 1000 cutoffTimeMillis = noteStatusHistory[c.createdAtMillisKey].max() - millisToLookBack nshToFilter = noteStatusHistory[noteStatusHistory[c.createdAtMillisKey] > cutoffTimeMillis] - print( + logger.info( f" Notes to apply the post-first-status filter for (from last {daysInPastToApplyPostFirstStatusFiltering} days): {len(nshToFilter)}" ) nshToFilter[ratingCutoffTimeMillisKey] = nshToFilter[ @@ -802,7 +805,7 @@ def filter_prescoring_input_to_simulate_delay_in_hours( cutoffMillis = latestRatingMillis - ( filterPrescoringInputToSimulateDelayInHours * 60 * 60 * 1000 ) - print( + logger.info( f""" Filtering input data for prescoring to simulate running prescoring earlier than final scoring. Latest rating timestamp: {pd.to_datetime(latestRatingMillis, unit='ms')} diff --git a/sourcecode/scoring/reputation_matrix_factorization/diligence_model.py b/sourcecode/scoring/reputation_matrix_factorization/diligence_model.py index be32375f..52f6f247 100644 --- a/sourcecode/scoring/reputation_matrix_factorization/diligence_model.py +++ b/sourcecode/scoring/reputation_matrix_factorization/diligence_model.py @@ -1,3 +1,4 @@ +import logging from typing import Optional, Tuple from .. import constants as c @@ -12,6 +13,10 @@ import torch +logger = logging.getLogger("birdwatch.diligence_model") +logger.setLevel(logging.INFO) + + def _setup_dataset_and_hparams( filteredRatings: pd.DataFrame, device=torch.device("cpu"), @@ -120,7 +125,7 @@ def fit_low_diligence_model_final( globalInterceptInit=globalInterceptDiligence, device=device, ) - print(f"Low diligence final loss: {loss_final:.4f}") + logger.info(f"Low diligence final loss: {loss_final:.4f}") noteStats = pd.DataFrame( { @@ -170,7 +175,7 @@ def fit_low_diligence_model_prescoring( raterInitState=raterInitStateInternal, device=device, ) - print(f"Low diligence training loss: {loss1:.4f}, {loss2:.4f}, {loss3:.4f}") + logger.info(f"Low diligence training loss: {loss1:.4f}, {loss2:.4f}, {loss3:.4f}") noteStats = pd.DataFrame( { diff --git a/sourcecode/scoring/reputation_matrix_factorization/helpfulness_model.py b/sourcecode/scoring/reputation_matrix_factorization/helpfulness_model.py index bcd903b4..f19f4453 100644 --- a/sourcecode/scoring/reputation_matrix_factorization/helpfulness_model.py +++ b/sourcecode/scoring/reputation_matrix_factorization/helpfulness_model.py @@ -1,3 +1,4 @@ +import logging from typing import Optional, Tuple from .. import constants as c @@ -12,6 +13,10 @@ import torch +logger = logging.getLogger("birdwatch.helpfulness_model") +logger.setLevel(logging.INFO) + + def _setup_dataset_and_hparams( filteredRatings: pd.DataFrame, device=torch.device("cpu"), @@ -85,7 +90,7 @@ def get_helpfulness_reputation_results_final( globalInterceptInit=globalIntercept, device=device, ) - print(f"Helpfulness reputation loss: {loss:.4f}") + logger.info(f"Helpfulness reputation loss: {loss:.4f}") # Compose and return DataFrames noteStats = pd.DataFrame( @@ -131,7 +136,7 @@ def get_helpfulness_reputation_results_prescoring( raterInitState=raterInitState, device=device, ) - print(f"Helpfulness reputation loss: {loss1:.4f}, {loss2:.4f}, {loss3:.4f}") + logger.info(f"Helpfulness reputation loss: {loss1:.4f}, {loss2:.4f}, {loss3:.4f}") # Compose and return DataFrames noteStats = pd.DataFrame( diff --git a/sourcecode/scoring/reputation_matrix_factorization/reputation_matrix_factorization.py b/sourcecode/scoring/reputation_matrix_factorization/reputation_matrix_factorization.py index 58102a39..6e9a8617 100644 --- a/sourcecode/scoring/reputation_matrix_factorization/reputation_matrix_factorization.py +++ b/sourcecode/scoring/reputation_matrix_factorization/reputation_matrix_factorization.py @@ -1,4 +1,5 @@ from dataclasses import dataclass +import logging import time from typing import Optional @@ -11,6 +12,10 @@ import torch.nn as nn +logger = logging.getLogger("birdwatch.reputation_matrix_factorization") +logger.setLevel(logging.INFO) + + # Define dataclass to represent learning hyperparameters @dataclass class ReputationModelHyperparameters: @@ -120,8 +125,8 @@ def __init__( def init_parameter(self, initDf, initCol, idKey, ratersOrNotes, device, defaultValue): if initDf is not None and initCol in initDf.columns: idToInitValue = dict(initDf[[idKey, initCol]].values) - print(f"Initializing {initCol}:") - print( + logger.info(f"Initializing {initCol}:") + logger.info( f" num in dataset: {ratersOrNotes.shape[0]}, vs. num we are initializing: {len(initDf)}" ) paramWeightToInit = nn.Parameter( @@ -135,11 +140,11 @@ def init_parameter(self, initDf, initCol, idKey, ratersOrNotes, device, defaultV .reshape(-1, 1) .to(device) ) - print(f" uninitialized {initCol}s: {(paramWeightToInit == 0).flatten().sum()}") - print(f" initialized {initCol}s: {(paramWeightToInit != 0).flatten().sum()}") + logger.info(f" uninitialized {initCol}s: {(paramWeightToInit == 0).flatten().sum()}") + logger.info(f" initialized {initCol}s: {(paramWeightToInit != 0).flatten().sum()}") return paramWeightToInit else: - print(f"Not initializing {initCol}") + logger.info(f"Not initializing {initCol}") return None def init_note_factor(self, noteInitState, dataset, device, defaultValue=0): @@ -292,11 +297,13 @@ def _train_one_round(model, loss_fn, dataset, hParams): loss += model.get_regularization_loss(numRatings) assert not torch.isnan(loss).any() if hParams.logRate and epoch % hParams.logRate == 0: - print(f"epoch={epoch:03d} | loss={loss.item():7.6f} | time={time.time() - start:.1f}s") + logger.info(f"epoch={epoch:03d} | loss={loss.item():7.6f} | time={time.time() - start:.1f}s") if hParams.convergence > 0 and epoch % hParams.stablePeriod == 0: if priorLoss is not None and (priorLoss - loss).abs() < hParams.convergence: if hParams.logRate: - print(f"epoch={epoch:03d} | loss={loss.item():7.6f} | time={time.time() - start:.1f}s") + logger.info( + f"epoch={epoch:03d} | loss={loss.item():7.6f} | time={time.time() - start:.1f}s" + ) break priorLoss = loss # Perform backward pass @@ -336,7 +343,9 @@ def _setup_model( assert hParams.lossFunction == "BCEWithLogitsLoss" loss_fn = nn.BCEWithLogitsLoss(reduction="none") - print(f"Setup model: noteInitState: \n{noteInitState},\n raterInitState: \n{raterInitState}") + logger.info( + f"Setup model: noteInitState: \n{noteInitState},\n raterInitState: \n{raterInitState}" + ) model = ReputationMFModel( dataset, activation_fn=activation_fn, @@ -366,8 +375,8 @@ def train_model_prescoring( ): model, loss_fn = _setup_model(dataset, hParams, noteInitState, raterInitState) - print("Reputation Matrix Factorization: rater reputation frozen") - print("Round 1:") + logger.info("Reputation Matrix Factorization: rater reputation frozen") + logger.info("Round 1:") loss_fn_1 = WeightedLoss( loss_fn, dataset.noteTensor, @@ -380,10 +389,10 @@ def train_model_prescoring( ) model.raterReputation.requires_grad_(False) loss1 = _train_one_round(model, loss_fn_1, dataset, hParams) - print(f"After round 1, global bias: {model.globalBias}") + logger.info(f"After round 1, global bias: {model.globalBias}") globalInt1 = model.globalBias.data.cpu().detach().numpy().item() - print("\nRound 2: learn rater rep (and everything else), freeze note intercept") + logger.info("\nRound 2: learn rater rep (and everything else), freeze note intercept") loss_fn_2 = WeightedLoss( loss_fn, dataset.noteTensor, @@ -402,7 +411,7 @@ def train_model_prescoring( noteIntercept2 = model.noteBias.weight.cpu().flatten().detach().numpy().copy() raterIntercept2 = model.raterBias.weight.cpu().flatten().detach().numpy().copy() - print("\nRound 3: fit intercepts and global intercept with everything else frozen") + logger.info("\nRound 3: fit intercepts and global intercept with everything else frozen") model.l2Lambda = hParams.l2Lambda * hParams.l2LambdaThirdRoundMultiplier model.l2NoteBiasMultiplier = hParams.l2NoteBiasMultiplier * hParams.l2NoteBiasThirdRoundMultiplier model.noteBias.requires_grad_(True) @@ -427,7 +436,7 @@ def train_model_prescoring( loss3 = _train_one_round(model, loss_fn_3, dataset, hParams) - print(f"After round 3, global bias: {model.globalBias}") + logger.info(f"After round 3, global bias: {model.globalBias}") globalInt3 = model.globalBias.data.cpu().detach().numpy().item() globalIntercept = c.ReputationGlobalIntercept( firstRound=globalInt1, secondRound=globalInt2, finalRound=globalInt3 @@ -464,7 +473,7 @@ def train_model_final( dataset, hParams, noteInitState, raterInitState, globalInterceptInit.secondRound ) - print( + logger.info( "Final scoring, initial round fitting reputation MF (equivalent to Round 2 in Prescoring - learn note factor)" ) @@ -487,7 +496,7 @@ def train_model_final( ) _train_one_round(model, loss_fn_2, dataset, hParams) - print("Final scoring, final round fitting reputation MF: learn just note intercept") + logger.info("Final scoring, final round fitting reputation MF: learn just note intercept") # Now set the global intercept to the value from the final round model.globalBias.data = torch.tensor(globalInterceptInit.finalRound, **model.format) diff --git a/sourcecode/scoring/reputation_scorer.py b/sourcecode/scoring/reputation_scorer.py index 4833d118..31af5439 100644 --- a/sourcecode/scoring/reputation_scorer.py +++ b/sourcecode/scoring/reputation_scorer.py @@ -1,3 +1,4 @@ +import logging from typing import Dict, List, Optional, Tuple from . import constants as c @@ -14,6 +15,10 @@ import torch +logger = logging.getLogger("birdwatch.reputation_scorer") +logger.setLevel(logging.INFO) + + class ReputationScorer(Scorer): """Applies reputation matrix factorization to helpfulness bridging.""" @@ -105,7 +110,7 @@ def _prescore_notes_and_users( self, ratings: pd.DataFrame, noteStatusHistory: pd.DataFrame, userEnrollmentRaw: pd.DataFrame ) -> Tuple[pd.DataFrame, pd.DataFrame, c.PrescoringMetaScorerOutput]: if self._seed is not None: - print(f"seeding with {self._seed}") + logger.info(f"seeding with {self._seed}") torch.manual_seed(self._seed) ratings = filter_ratings(ratings, self._minNumRatingsPerRater, self._minNumRatersPerNote) # Calculate initialization factors if necessary @@ -129,7 +134,7 @@ def _prescore_notes_and_users( # Fill in NaN values for any missing notes noteStats = noteStats.merge(noteStatusHistory[[c.noteIdKey]].drop_duplicates(), how="outer") assert len(noteStats) == len(noteStatusHistory) - print( + logger.info( f"""Reputation prescoring: returning these columns: noteStats: {noteStats.columns} raterStats: {raterStats.columns} @@ -155,7 +160,7 @@ def _score_notes_and_users( prescoringMetaScorerOutput: c.PrescoringMetaScorerOutput, ) -> Tuple[pd.DataFrame, pd.DataFrame]: if self._seed is not None: - print(f"seeding with {self._seed}") + logger.info(f"seeding with {self._seed}") torch.manual_seed(self._seed) ratings = filter_ratings(ratings, self._minNumRatingsPerRater, self._minNumRatersPerNote) if len(ratings) == 0: diff --git a/sourcecode/scoring/run_scoring.py b/sourcecode/scoring/run_scoring.py index a2bd35b6..a74bcc44 100644 --- a/sourcecode/scoring/run_scoring.py +++ b/sourcecode/scoring/run_scoring.py @@ -9,6 +9,7 @@ import gc import io from itertools import chain +import logging import multiprocessing from multiprocessing import shared_memory # type: ignore import time @@ -51,6 +52,10 @@ import sklearn +logger = logging.getLogger("birdwatch.run_scoring") +logger.setLevel(logging.INFO) + + def _get_scorers( seed: Optional[int], pseudoraters: Optional[bool], @@ -124,8 +129,6 @@ def _get_scorers( ] scorers[Scorers.MFMultiGroupScorer] = [ MFMultiGroupScorer(includedGroups={4, 5, 7, 12, 26}, groupId=1, threads=4, seed=seed), - MFMultiGroupScorer(includedGroups={24, 27}, groupId=2, threads=4, seed=seed), - MFMultiGroupScorer(includedGroups={16, 20, 22, 23, 28}, groupId=3, threads=4, seed=seed), ] return scorers @@ -264,17 +267,17 @@ def _run_scorer_parallelizable( scoringArgs = copy.deepcopy(scoringArgs) if scoringArgsSharedMemory is not None: - print( + logger.info( f"{scorer.get_name()} run_scorer_parallelizable just started in parallel: loading data from shared memory." ) scoringArgs = _load_data_from_shared_memory_parallelizable( scoringArgsSharedMemory, scoringArgs ) - print( + logger.info( f"{scorer.get_name()} run_scorer_parallelizable just finished loading data from shared memory." ) elif dataLoader is not None: - print( + logger.info( f"{scorer.get_name()} run_scorer_parallelizable just started in parallel: loading data with dataLoader." ) scoringArgs = _load_data_with_data_loader_parallelizable(dataLoader, scoringArgs) @@ -398,6 +401,7 @@ def _run_scorers( """ # Apply scoring algorithms overallStartTime = time.perf_counter() + if runParallel: shms, scoringArgsSharedMemory = _save_dfs_to_shared_memory(scoringArgs) @@ -405,7 +409,7 @@ def _run_scorers( mp_context=multiprocessing.get_context("fork"), max_workers=maxWorkers, ) as executor: - print(f"Starting parallel scorer execution with {len(scorers)} scorers.") + logger.info(f"Starting parallel scorer execution with {len(scorers)} scorers.") # Pass mostly-empty scoringArgs: the data is too large to be copied in-memory to # each process, so must be re-loaded from disk by every scorer's dataLoader. scoringArgs.remove_large_args_for_multiprocessing() @@ -438,7 +442,7 @@ def _run_scorers( modelResultsTuple, scorerTimesTuple = zip(*modelResultsAndTimes) overallTime = time.perf_counter() - overallStartTime - print( + logger.info( f"""---- Completed individual scorers. Ran in parallel: {runParallel}. Succeeded in {overallTime:.2f} seconds. Individual scorers: (name, runtime): {list(zip( @@ -613,7 +617,7 @@ def meta_score( auxiliaryNoteInfo: pd.DataFrame, noteStatusHistory: pd.DataFrame, enabledScorers: Optional[Set[Scorers]], - enableNmrDueToMinStableCrhTime: bool = False, + enableNmrDueToMinStableCrhTime: bool = True, ) -> Tuple[pd.DataFrame, pd.DataFrame]: """Determine final note status based on individual scoring results. @@ -681,16 +685,15 @@ def meta_score( ) ) if enabledScorers is None or Scorers.MFMultiGroupScorer in enabledScorers: - for i in range(1, 4): - rules.append( - scoring_rules.ApplyModelResult( - RuleID[f"MULTI_GROUP_MODEL_{i}"], - {RuleID.CORE_MODEL}, - c.multiGroupRatingStatusKey, - checkFirmReject=True, - filterColumnPairs=[(c.modelingMultiGroupKey, i)], - ) + rules.append( + scoring_rules.ApplyModelResult( + RuleID["MULTI_GROUP_MODEL_1"], + {RuleID.CORE_MODEL}, + c.multiGroupRatingStatusKey, + checkFirmReject=True, + filterColumnPairs=[(c.modelingMultiGroupKey, 1)], ) + ) if enabledScorers is None or Scorers.MFGroupScorer in enabledScorers: # TODO: modify this code to work when MFExpansionScorer is disabled by the system test assert len(scorers[Scorers.MFCoreScorer]) == 1 @@ -783,12 +786,12 @@ def meta_score( ) blockedRows = coreRejects | (scoringResult[c.coreRatingStatusKey].isna() & expansionRejects) crhRows = scoringResult[c.finalRatingStatusKey] == c.currentlyRatedHelpful - print("Summary of blocked and CRH rows:") + logger.info("Summary of blocked and CRH rows:") # TODO: validate that these are all due to ScoringDriftGuard and change to an assert - print( + logger.info( scoringResult[blockedRows & crhRows][c.metaScorerActiveRulesKey].value_counts(dropna=False) ) - print(scoringResult[blockedRows & crhRows][c.decidedByKey].value_counts(dropna=False)) + logger.info(scoringResult[blockedRows & crhRows][c.decidedByKey].value_counts(dropna=False)) with c.time_block("Post-scorers: Meta Score: Preparing Return Values"): scoredNotesCols = scoringResult[ [ @@ -1043,16 +1046,16 @@ def run_prescoring( useStableInitialization: bool = True, pseudoraters: bool = True, checkFlips: bool = True, - enableNmrDueToMinStableCrhTime: bool = False, + enableNmrDueToMinStableCrhTime: bool = True, previousRatingCutoffTimestampMillis: Optional[int] = None, ) -> Tuple[ pd.DataFrame, pd.DataFrame, sklearn.pipeline.Pipeline, c.PrescoringMetaOutput, pd.DataFrame ]: with c.time_block("Logging Prescoring Inputs Initial RAM usage"): - print(get_df_info(notes, "notes")) - print(get_df_info(ratings, "ratings")) - print(get_df_info(noteStatusHistory, "noteStatusHistory")) - print(get_df_info(userEnrollment, "userEnrollment")) + logger.info(get_df_info(notes, "notes")) + logger.info(get_df_info(ratings, "ratings")) + logger.info(get_df_info(noteStatusHistory, "noteStatusHistory")) + logger.info(get_df_info(userEnrollment, "userEnrollment")) with c.time_block("Note Topic Assignment"): topicModel = TopicModel() noteTopicClassifierPipe, seedLabels, conflictedTexts = topicModel.train_note_topic_classifier( @@ -1065,11 +1068,11 @@ def run_prescoring( with c.time_block("Compute Post Selection Similarity"): pss = PostSelectionSimilarity(notes, ratings) postSelectionSimilarityValues = pss.get_post_selection_similarity_values() - print(f"Post Selection Similarity Prescoring: begin with {len(ratings)} ratings.") + logger.info(f"Post Selection Similarity Prescoring: begin with {len(ratings)} ratings.") ratings = filter_ratings_by_post_selection_similarity( notes, ratings, postSelectionSimilarityValues ) - print(f"Post Selection Similarity Prescoring: {len(ratings)} ratings remaining.") + logger.info(f"Post Selection Similarity Prescoring: {len(ratings)} ratings remaining.") del pss gc.collect() @@ -1092,15 +1095,17 @@ def run_prescoring( noteStatusHistory[c.noteAuthorParticipantIdKey] = noteStatusHistoryIds userEnrollment[c.participantIdKey] = userEnrollmentIds del ratingIds, noteStatusHistoryIds, userEnrollmentIds - print("User IDs for ratings, noteStatusHistory and userEnrollment converted to Int64Dtype.") + logger.info( + "User IDs for ratings, noteStatusHistory and userEnrollment converted to Int64Dtype." + ) conversion = True except ValueError as e: - print(f"Error converting user IDs to ints. IDs will remain as strings. {repr(e)}") + logger.info(f"Error converting user IDs to ints. IDs will remain as strings. {repr(e)}") with c.time_block("Logging Prescoring Inputs RAM usage before _run_scorers"): - print(get_df_info(notes, "notes")) - print(get_df_info(ratings, "ratings")) - print(get_df_info(noteStatusHistory, "noteStatusHistory")) - print(get_df_info(userEnrollment, "userEnrollment")) + logger.info(get_df_info(notes, "notes")) + logger.info(get_df_info(ratings, "ratings")) + logger.info(get_df_info(noteStatusHistory, "noteStatusHistory")) + logger.info(get_df_info(userEnrollment, "userEnrollment")) prescoringModelResultsFromAllScorers = _run_scorers( scorers=list(chain(*scorers.values())), scoringArgs=PrescoringArgs( @@ -1126,15 +1131,15 @@ def run_prescoring( gc.collect() with c.time_block("Logging Prescoring Results RAM usage (before conversion)"): - print(get_df_info(notes, "notes")) - print(get_df_info(ratings, "ratings")) - print(get_df_info(noteStatusHistory, "noteStatusHistory")) - print(get_df_info(userEnrollment, "userEnrollment")) - print(get_df_info(prescoringNoteModelOutput, "prescoringNoteModelOutput")) - print(get_df_info(prescoringRaterModelOutput, "prescoringRaterModelOutput")) + logger.info(get_df_info(notes, "notes")) + logger.info(get_df_info(ratings, "ratings")) + logger.info(get_df_info(noteStatusHistory, "noteStatusHistory")) + logger.info(get_df_info(userEnrollment, "userEnrollment")) + logger.info(get_df_info(prescoringNoteModelOutput, "prescoringNoteModelOutput")) + logger.info(get_df_info(prescoringRaterModelOutput, "prescoringRaterModelOutput")) # Restore IDs as string objects now that prescoring is over and memory pressure is relaxed. if conversion: - print("Restoring string IDs.") + logger.info("Restoring string IDs.") ratings[c.raterParticipantIdKey] = ratings[c.raterParticipantIdKey].astype(str) noteStatusHistory[c.noteAuthorParticipantIdKey] = noteStatusHistory[ c.noteAuthorParticipantIdKey @@ -1144,15 +1149,15 @@ def run_prescoring( prescoringRaterModelOutput[c.raterParticipantIdKey] = prescoringRaterModelOutput[ c.raterParticipantIdKey ].astype(str) - print("Restoration of original string IDs complete.") + logger.info("Restoration of original string IDs complete.") with c.time_block("Logging Prescoring Results RAM usage (after conversion)"): - print(get_df_info(notes, "notes")) - print(get_df_info(ratings, "ratings")) - print(get_df_info(noteStatusHistory, "noteStatusHistory")) - print(get_df_info(userEnrollment, "userEnrollment")) - print(get_df_info(prescoringNoteModelOutput, "prescoringNoteModelOutput")) - print(get_df_info(prescoringRaterModelOutput, "prescoringRaterModelOutput")) + logger.info(get_df_info(notes, "notes")) + logger.info(get_df_info(ratings, "ratings")) + logger.info(get_df_info(noteStatusHistory, "noteStatusHistory")) + logger.info(get_df_info(userEnrollment, "userEnrollment")) + logger.info(get_df_info(prescoringNoteModelOutput, "prescoringNoteModelOutput")) + logger.info(get_df_info(prescoringRaterModelOutput, "prescoringRaterModelOutput")) prescoringRaterModelOutput = pd.concat( [prescoringRaterModelOutput, postSelectionSimilarityValues], @@ -1161,7 +1166,7 @@ def run_prescoring( }, ) with c.time_block("Logging Prescoring Results RAM usage (after concatenation)"): - print(get_df_info(prescoringRaterModelOutput, "prescoringRaterModelOutput")) + logger.info(get_df_info(prescoringRaterModelOutput, "prescoringRaterModelOutput")) # Prescoring itself is now done. We will not run final_note_scoring to check note status flips. if checkFlips: @@ -1253,7 +1258,7 @@ def determine_which_notes_to_rescore( notesWithNewRatings = set( ratings.loc[ratings[c.createdAtMillisKey] > previousRatingCutoffTimestampMillis, c.noteIdKey] ) - print( + logger.info( f"1. Num notes with new ratings since last scoring run (ts: {previousRatingCutoffTimestampMillis}): {len(notesWithNewRatings)}" ) notesToRescoreSet.update(notesWithNewRatings) @@ -1282,8 +1287,8 @@ def determine_which_notes_to_rescore( newNotesNotRescoredRecentlyEnough = set( noteStatusHistory.loc[noteCreatedRecently & noteNotRescoredRecently, c.noteIdKey] ) - print("2. Rescore all recently created notes if not rescored at the minimum frequency.") - print("Num notes created recently:", noteCreatedRecently.sum()) + logger.info("2. Rescore all recently created notes if not rescored at the minimum frequency.") + logger.info(f"Num notes created recently: {noteCreatedRecently.sum()}") # Remove notes with new ratings from this set. newNotesNotRescoredRecentlyEnough = newNotesNotRescoredRecentlyEnough.difference( notesWithNewRatings @@ -1310,8 +1315,8 @@ def determine_which_notes_to_rescore( c.noteIdKey, ] ).difference(notesWithNewRatings) - print( - "3. Rescore all notes that flipped status in the previous scoring run.", len(justFlippedNotes) + logger.info( + f"3. Rescore all notes that flipped status in the previous scoring run. {len(justFlippedNotes)}" ) notesToRescoreSet.update(justFlippedNotes) noteSubsets.append( @@ -1336,9 +1341,9 @@ def determine_which_notes_to_rescore( noteStatusHistory[c.timestampMillisOfNoteCurrentLabelKey] < currentMillis - scoreRecentlyFlippedNotesMinimumFrequencyMillis ) - print("4. Rescore all recently-flipped notes if not rescored at the minimum frequency.") - print("Num notes flipped recently:", noteFlippedRecently.sum()) - print("Num notes not rescored recently enough:", noteNotRescoredRecently.sum()) + logger.info("4. Rescore all recently-flipped notes if not rescored at the minimum frequency.") + logger.info(f"Num notes flipped recently: {noteFlippedRecently.sum()}") + logger.info(f"Num notes not rescored recently enough: {noteNotRescoredRecently.sum()}") recentlyFlippedNotesNotRescoredRecentlyEnough = set( noteStatusHistory.loc[noteFlippedRecently & noteNotRescoredRecently, c.noteIdKey] ) @@ -1365,9 +1370,8 @@ def determine_which_notes_to_rescore( c.noteIdKey, ] ) - print( - "5. Rescore all notes that were NMRed due to MinStableCrhTime was not met.", - len(nmrDueToMinStableCrhTimeNotes), + logger.info( + f"5. Rescore all notes that were NMRed due to MinStableCrhTime was not met. {len(nmrDueToMinStableCrhTimeNotes)}" ) notesToRescoreSet.update(nmrDueToMinStableCrhTimeNotes) noteSubsets.append( @@ -1379,7 +1383,7 @@ def determine_which_notes_to_rescore( ) ) - print( + logger.info( f"""----\nNotes to rescore: * {len(notesWithNewRatings)} notes with new ratings since last scoring run. * {len(newNotesNotRescoredRecentlyEnough)} notes created recently and not rescored recently enough. @@ -1412,18 +1416,18 @@ def run_final_note_scoring( previousScoredNotes: Optional[pd.DataFrame] = None, previousAuxiliaryNoteInfo: Optional[pd.DataFrame] = None, previousRatingCutoffTimestampMillis: Optional[int] = 0, - enableNmrDueToMinStableCrhTime: bool = False, + enableNmrDueToMinStableCrhTime: bool = True, ): with c.time_block("Logging Final Scoring RAM usage"): - print(get_df_info(notes, "notes")) - print(get_df_info(ratings, "ratings")) - print(get_df_info(noteStatusHistory, "noteStatusHistory")) - print(get_df_info(userEnrollment, "userEnrollment")) - print(get_df_info(prescoringNoteModelOutput, "prescoringNoteModelOutput")) - print(get_df_info(prescoringRaterModelOutput, "prescoringRaterModelOutput")) + logger.info(get_df_info(notes, "notes")) + logger.info(get_df_info(ratings, "ratings")) + logger.info(get_df_info(noteStatusHistory, "noteStatusHistory")) + logger.info(get_df_info(userEnrollment, "userEnrollment")) + logger.info(get_df_info(prescoringNoteModelOutput, "prescoringNoteModelOutput")) + logger.info(get_df_info(prescoringRaterModelOutput, "prescoringRaterModelOutput")) with c.time_block("Determine which notes to score."): if previousScoredNotes is None: - print("No previous scored notes passed; scoring all notes.") + logger.info("No previous scored notes passed; scoring all notes.") notesToRescoreSet: Set[int] = set() scoredNotesPassthrough = None currentMillis = int(time.time() * 1000) @@ -1472,7 +1476,7 @@ def run_final_note_scoring( else: assert previousAuxiliaryNoteInfo is not None assert previousRatingCutoffTimestampMillis is not None - print("Previous scored notes passed; determining which notes to rescore.") + logger.info("Previous scored notes passed; determining which notes to rescore.") # Filter all datasets to smaller versions which only contain notes which need to be scored. noteSubsets, notesToRescoreSet = determine_which_notes_to_rescore( notes, ratings, noteStatusHistory, previousRatingCutoffTimestampMillis @@ -1488,7 +1492,7 @@ def run_final_note_scoring( ~noteStatusHistory[c.noteIdKey].isin(notesToRescoreSet) ] - print( + logger.info( f"Rescoring {len(notesToRescoreSet)} notes, out of {len(notes)} total. Original number of ratings: {len(ratings)}" ) @@ -1500,7 +1504,7 @@ def run_final_note_scoring( prescoringNoteModelOutput[c.noteIdKey].isin(notesToRescoreSet) ] - print(f"Ratings on notes to rescore: {len(ratings)}") + logger.info(f"Ratings on notes to rescore: {len(ratings)}") with c.time_block("Preprocess smaller dataset since we skipped preprocessing at read time"): notes, ratings, noteStatusHistory = preprocess_data(notes, ratings, noteStatusHistory) @@ -1510,7 +1514,7 @@ def run_final_note_scoring( noteTopics = topicModel.get_note_topics(notes, noteTopicClassifier) with c.time_block("Post Selection Similarity: Final Scoring"): - print(f"Post Selection Similarity Final Scoring: begin with {len(ratings)} ratings.") + logger.info(f"Post Selection Similarity Final Scoring: begin with {len(ratings)} ratings.") ratings = filter_ratings_by_post_selection_similarity( notes, ratings, @@ -1518,7 +1522,7 @@ def run_final_note_scoring( [c.raterParticipantIdKey, c.postSelectionValueKey] ], ) - print(f"Post Selection Similarity Final Scoring: {len(ratings)} ratings remaining.") + logger.info(f"Post Selection Similarity Final Scoring: {len(ratings)} ratings remaining.") scorers = _get_scorers(seed, pseudoraters, useStableInitialization=useStableInitialization) @@ -1596,7 +1600,7 @@ def post_note_scoring( enabledScorers: Optional[Set[Scorers]] = None, strictColumns: bool = True, checkFlips: bool = True, - enableNmrDueToMinStableCrhTime: bool = False, + enableNmrDueToMinStableCrhTime: bool = True, ): """ Apply individual scoring models and obtained merged result. @@ -1674,7 +1678,9 @@ def post_note_scoring( scoredNotes, newNoteStatusHistory, auxiliaryNoteInfo ) - print(f"Meta scoring elapsed time: {((time.time() - postScoringStartTime)/60.0):.2f} minutes.") + logger.info( + f"Meta scoring elapsed time: {((time.time() - postScoringStartTime)/60.0):.2f} minutes." + ) return scoredNotes, newNoteStatusHistory, auxiliaryNoteInfo @@ -1775,7 +1781,7 @@ def run_scoring( previousRatingCutoffTimestampMillis=previousRatingCutoffTimestampMillis, ) - print("We invoked run_scoring and are now in between prescoring and scoring.") + logger.info("We invoked run_scoring and are now in between prescoring and scoring.") if writePrescoringScoringOutputCallback is not None: with c.time_block("Writing prescoring output."): writePrescoringScoringOutputCallback( @@ -1785,7 +1791,7 @@ def run_scoring( prescoringMetaOutput, prescoringScoredNotes, ) - print("Starting final scoring") + logger.info("Starting final scoring") scoredNotes, newNoteStatusHistory, auxiliaryNoteInfo = run_final_note_scoring( notes=notes, @@ -1809,7 +1815,7 @@ def run_scoring( previousRatingCutoffTimestampMillis=previousRatingCutoffTimestampMillis, ) - print("Starting contributor scoring") + logger.info("Starting contributor scoring") helpfulnessScores = run_contributor_scoring( ratings=ratings, diff --git a/sourcecode/scoring/runner.py b/sourcecode/scoring/runner.py index 9dd359a8..f37d7488 100644 --- a/sourcecode/scoring/runner.py +++ b/sourcecode/scoring/runner.py @@ -1,4 +1,5 @@ import argparse +import logging import os import sys @@ -11,6 +12,10 @@ import pandas as pd +logger = logging.getLogger("birdwatch.runner") +logger.setLevel(logging.INFO) + + def parse_args(): parser = argparse.ArgumentParser("Community Notes Scoring") parser.add_argument( @@ -243,8 +248,8 @@ def main( ): if args is None: args = parse_args() - print(f"scorer python version: {sys.version}") - print(f"scorer pandas version: {pd.__version__}") + logger.info(f"scorer python version: {sys.version}") + logger.info(f"scorer pandas version: {pd.__version__}") # patch_pandas requires that args are available (which matches the production binary) so # we first parse the arguments then invoke the decorated _run_scorer. return _run_scorer(args=args, dataLoader=dataLoader, extraScoringArgs=extraScoringArgs) diff --git a/sourcecode/scoring/scorer.py b/sourcecode/scoring/scorer.py index 4a3c2977..7a0a6d41 100644 --- a/sourcecode/scoring/scorer.py +++ b/sourcecode/scoring/scorer.py @@ -1,6 +1,7 @@ from abc import ABC, abstractmethod from contextlib import contextmanager import gc +import logging import time from typing import Dict, List, Optional, Set, Tuple @@ -13,6 +14,9 @@ import torch +logger = logging.getLogger("birdwatch.scorer") +logger.setLevel(logging.INFO) + _IN_GROUP = "inGroup" @@ -57,7 +61,7 @@ def time_block(self, label): yield finally: end = time.time() - print( + logger.info( f"{self.get_name()} {label} elapsed time: {end - start:.2f} secs ({((end-start)/60.0):.2f} mins)" ) @@ -113,13 +117,13 @@ def _filter_input( """ if (not self._includedGroups) and (not self._includedTopics): return ratings, noteStatusHistory - print(f"Filtering ratings for {self.get_name()}. Original rating length: {len(ratings)}") + logger.info(f"Filtering ratings for {self.get_name()}. Original rating length: {len(ratings)}") # Apply topic filter if self._includedTopics: notes = noteTopics[noteTopics[c.noteTopicKey].isin(self._includedTopics)][[c.noteIdKey]] ratings = ratings.merge(notes) noteStatusHistory = noteStatusHistory.merge(notes) - print(f" Ratings after topic filter: {len(ratings)}") + logger.info(f" Ratings after topic filter: {len(ratings)}") # Apply group filter if self._includedGroups: userEnrollment = userEnrollment[[c.participantIdKey, c.modelingGroupKey]].rename( @@ -131,10 +135,10 @@ def _filter_input( ratings = ratings.merge( userEnrollment[[c.raterParticipantIdKey, _IN_GROUP]], on=c.raterParticipantIdKey, how="left" ) - print(f" Ratings without assigned group: {ratings[_IN_GROUP].isna().sum()}") + logger.info(f" Ratings without assigned group: {ratings[_IN_GROUP].isna().sum()}") ratings = ratings.fillna({_IN_GROUP: self._includeUnassigned}) ratings = ratings[ratings[_IN_GROUP]].drop(columns=[_IN_GROUP]) - print(f" Ratings after group filter: {len(ratings)}") + logger.info(f" Ratings after group filter: {len(ratings)}") return ratings, noteStatusHistory def _postprocess_output( @@ -167,7 +171,7 @@ def _postprocess_output( if self._captureThreshold is None: return noteScores, userScores # Identify notes with enough ratings from within the modeling group. - print(f"Postprocessing output for {self.get_name()}") + logger.info(f"Postprocessing output for {self.get_name()}") assert self._includedGroups, "includedGroups must be set" userEnrollment = userEnrollment[[c.participantIdKey, c.modelingGroupKey]].rename( columns={c.participantIdKey: c.raterParticipantIdKey} @@ -180,11 +184,11 @@ def _postprocess_output( ) ratings = ratings.fillna({_IN_GROUP: self._includeUnassigned}) ratios = ratings[[c.noteIdKey, _IN_GROUP]].groupby(c.noteIdKey).mean().reset_index() - print(f" Original noteScores length: {len(noteScores)}") + logger.info(f" Original noteScores length: {len(noteScores)}") noteScores = noteScores.merge( ratios[ratios[_IN_GROUP] >= self._captureThreshold][[c.noteIdKey]] ) - print(f" Final noteScores length: {len(noteScores)}") + logger.info(f" Final noteScores length: {len(noteScores)}") return noteScores, userScores def _get_note_col_mapping(self) -> Dict[str, str]: @@ -247,7 +251,7 @@ def prescore(self, scoringArgs: PrescoringArgs, preserveRatings: bool = True) -> output that can be used to initialize and reduce the runtime of final scoring. """ torch.set_num_threads(self._threads) - print( + logger.info( f"prescore: Torch intra-op parallelism for {self.get_name()} set to: {torch.get_num_threads()}" ) # Transform input, run core scoring algorithm, transform output. @@ -344,7 +348,7 @@ def score_final(self, scoringArgs: FinalScoringArgs) -> ModelResult: c.scorerNameKey field of those dataframes. """ torch.set_num_threads(self._threads) - print( + logger.info( f"score_final: Torch intra-op parallelism for {self.get_name()} set to: {torch.get_num_threads()}" ) @@ -361,7 +365,7 @@ def score_final(self, scoringArgs: FinalScoringArgs) -> ModelResult: ].drop(columns=c.scorerNameKey, inplace=False) if self.get_name() not in scoringArgs.prescoringMetaOutput.metaScorerOutput: - print( + logger.info( f"Scorer {self.get_name()} not found in prescoringMetaOutput; returning empty scores from final scoring." ) return self._return_empty_final_scores() @@ -441,7 +445,7 @@ def score( This function is deprecated and only included for testing purposes for now. Not intended to be called in main code flow (since the scorer will be split, and this function calls both phases sequentially) """ - print( + logger.info( "CALLED DEPRECATED scorer.score() function. Prefer sequentially calling prescore() then score_final()." ) diff --git a/sourcecode/scoring/scoring_rules.py b/sourcecode/scoring/scoring_rules.py index aca07457..eeb52a47 100644 --- a/sourcecode/scoring/scoring_rules.py +++ b/sourcecode/scoring/scoring_rules.py @@ -1,6 +1,7 @@ from abc import ABC, abstractmethod from collections import namedtuple from enum import Enum +import logging from typing import Any, Callable, Dict, List, Optional, Set, Tuple from . import constants as c @@ -11,6 +12,9 @@ import pandas as pd +logger = logging.getLogger("birdwatch.scoring_rules") +logger.setLevel(logging.INFO) + RuleAndVersion = namedtuple("RuleAndVersion", ["ruleName", "ruleVersion", "lockingEnabled"]) """namedtuple identifying ScoringRule with a name and tracking revisions with a version.""" @@ -57,8 +61,6 @@ class RuleID(Enum): TOPIC_MODEL_2 = RuleAndVersion("TopicModel02", "1.0", False) TOPIC_MODEL_3 = RuleAndVersion("TopicModel03", "1.0", False) MULTI_GROUP_MODEL_1 = RuleAndVersion("MultiGroupModel01", "1.0", False) - MULTI_GROUP_MODEL_2 = RuleAndVersion("MultiGroupModel02", "1.0", False) - MULTI_GROUP_MODEL_3 = RuleAndVersion("MultiGroupModel03", "1.0", False) INSUFFICIENT_EXPLANATION = RuleAndVersion("InsufficientExplanation", "1.0", True) SCORING_DRIFT_GUARD = RuleAndVersion("ScoringDriftGuard", "1.0", False) NMR_DUE_TO_MIN_STABLE_CRH_TIME = RuleAndVersion("NmrDueToMinStableCrhTime", "1.0", False) @@ -270,7 +272,7 @@ def score_notes( [c.noteIdKey] ] noteStats = noteStats.merge(candidateNotes, on=c.noteIdKey, how="inner") - print(f"Candidate notes prior to tag filtering: {len(noteStats)}") + logger.info(f"Candidate notes prior to tag filtering: {len(noteStats)}") # Identify impacted notes. impactedNotes = pd.DataFrame.from_dict( @@ -279,13 +281,13 @@ def score_notes( c.activeFilterTagsKey: pd.Series([], dtype=object), } ) - print("Checking note tags:") + logger.info("Checking note tags:") for tag in c.notHelpfulTagsTSVOrder: adjustedColumn = f"{tag}{c.adjustedSuffix}" adjustedRatioColumn = f"{adjustedColumn}{c.ratioSuffix}" - print(tag) + logger.info(tag) if tag == c.notHelpfulHardToUnderstandKey: - print(f"outlier filtering disabled for tag: {tag}") + logger.info(f"outlier filtering disabled for tag: {tag}") continue tagFilteredNotes = noteStats[ # Adjusted total must pass minimum threhsold set across all tags. @@ -298,12 +300,12 @@ def score_notes( unsafeAllowed=[c.defaultIndexKey, c.activeFilterTagsKey], ) # log and consolidate imapcted notes - print(f"Total {{note, tag}} pairs where tag filter logic triggered: {len(impactedNotes)}") + logger.info(f"Total {{note, tag}} pairs where tag filter logic triggered: {len(impactedNotes)}") impactedNotes = impactedNotes.groupby(c.noteIdKey).aggregate(list).reset_index() impactedNotes[c.activeFilterTagsKey] = [ ",".join(tags) for tags in impactedNotes[c.activeFilterTagsKey] ] - print(f"Total unique notes impacted by tag filtering: {len(impactedNotes)}") + logger.info(f"Total unique notes impacted by tag filtering: {len(impactedNotes)}") noteStatusUpdates = impactedNotes[[c.noteIdKey]].drop_duplicates() noteStatusUpdates[statusColumn] = self._status return (noteStatusUpdates, impactedNotes) @@ -356,7 +358,7 @@ def score_notes( pd.testing.assert_frame_equal(noteStatusUpdates, noteStatusUpdates.drop_duplicates()) - print(f"Total notes impacted by incorrect filtering: {len(noteStatusUpdates)}") + logger.info(f"Total notes impacted by incorrect filtering: {len(noteStatusUpdates)}") noteStatusUpdates[statusColumn] = self._status return (noteStatusUpdates, None) @@ -400,7 +402,7 @@ def score_notes( pd.testing.assert_frame_equal(noteStatusUpdates, noteStatusUpdates.drop_duplicates()) - print(f"Total notes impacted by low diligence filtering: {len(noteStatusUpdates)}") + logger.info(f"Total notes impacted by low diligence filtering: {len(noteStatusUpdates)}") noteStatusUpdates[statusColumn] = self._status return (noteStatusUpdates, None) @@ -444,7 +446,7 @@ def score_notes( pd.testing.assert_frame_equal(noteStatusUpdates, noteStatusUpdates.drop_duplicates()) - print(f"Total notes impacted by large factor filtering: {len(noteStatusUpdates)}") + logger.info(f"Total notes impacted by large factor filtering: {len(noteStatusUpdates)}") noteStatusUpdates[statusColumn] = self._status return (noteStatusUpdates, None) @@ -551,7 +553,7 @@ def score_notes( ][[c.noteIdKey, newStatusColumn]] noteStatusUpdatesWithStatusChange.rename(columns={newStatusColumn: statusColumn}, inplace=True) - print( + logger.info( f"Total notes impacted (CRH->NMR) by NmrDueToMinStableCrhTime: " f"{len(noteStatusUpdatesWithStatusChange)}" ) @@ -788,7 +790,9 @@ def score_notes( | (currentLabels[statusColumn] == c.currentlyRatedNotHelpful) ][[c.noteIdKey]] crStats = noteStats.merge(crNotes, on=c.noteIdKey, how="inner") - print(f"CRH / CRNH notes prior to filtering for insufficient explanation: {len(crStats)}") + logger.info( + f"CRH / CRNH notes prior to filtering for insufficient explanation: {len(crStats)}" + ) # Identify impacted notes. noteStatusUpdates = crStats.loc[ @@ -798,7 +802,7 @@ def score_notes( pd.testing.assert_frame_equal(noteStatusUpdates, noteStatusUpdates.drop_duplicates()) - print(f"Total notes impacted by explanation filtering: {len(noteStatusUpdates)}") + logger.info(f"Total notes impacted by explanation filtering: {len(noteStatusUpdates)}") noteStatusUpdates[statusColumn] = self._status return (noteStatusUpdates, None) @@ -1019,7 +1023,7 @@ def apply_scoring_rules( # Successively apply each rule for rule in rules: with c.time_block(f"Applying scoring rule: {rule.get_name()}"): - print(f"Applying scoring rule: {rule.get_name()}") + logger.info(f"Applying scoring rule: {rule.get_name()}") rule.check_dependencies(ruleIDs) assert rule.get_rule_id() not in ruleIDs, f"repeat ruleID: {rule.get_name()}" ruleIDs.add(rule.get_rule_id()) diff --git a/sourcecode/scoring/tag_consensus.py b/sourcecode/scoring/tag_consensus.py index 6c1e4b8a..e51b61d6 100644 --- a/sourcecode/scoring/tag_consensus.py +++ b/sourcecode/scoring/tag_consensus.py @@ -1,3 +1,4 @@ +import logging from typing import Optional from . import constants as c, process_data @@ -6,6 +7,10 @@ import pandas as pd +logger = logging.getLogger("birdwatch.tag_consensus") +logger.setLevel(logging.INFO) + + def train_tag_model( ratings: pd.DataFrame, tag: str = c.notHelpfulSpamHarassmentOrAbuseTagKey, @@ -14,16 +19,16 @@ def train_tag_model( useSigmoidCrossEntropy: bool = True, name: Optional[str] = None, ): - print(f"-------------------Training for tag {tag}-------------------") + logger.info(f"-------------------Training for tag {tag}-------------------") ratingDataForTag, labelColName = prepare_tag_data(ratings, tag) if ratingDataForTag is None or len(ratingDataForTag) == 0: - print(f"No valid data for {tag}, returning None and aborting {tag} model training.") + logger.info(f"No valid data for {tag}, returning None and aborting {tag} model training.") return None, None, None posRate = ratingDataForTag[labelColName].sum() / len(ratingDataForTag) - print(f"{tag} Positive Rate: {posRate}") + logger.info(f"{tag} Positive Rate: {posRate}") if pd.isna(posRate) or posRate == 0 or posRate == 1: - print( + logger.info( f"{tag} tag positive rate is {posRate}: returning None and aborting {tag} model training." ) return None, None, None @@ -105,16 +110,16 @@ def prepare_tag_data( # Positives ratings.loc[ratings[tagName] == 1, labelColName] = 1 - print("Pre-filtering tag label breakdown", ratings.groupby(labelColName).size()) - print("Number of rows with no tag label", ratings[labelColName].isnull().sum()) + logger.info(f"Pre-filtering tag label breakdown {ratings.groupby(labelColName).size()}") + logger.info(f"Number of rows with no tag label {ratings[labelColName].isnull().sum()}") # Currently leave in raters who only made one type of rating, but can throw them out in the future. ratings = process_data.filter_ratings( ratings[ratings[labelColName].notnull()], minNumRatingsPerRater, minNumRatersPerNote ) - print("Post-filtering tag label breakdown", ratings.groupby(labelColName).size()) - print("Number of rows with no tag label", ratings[labelColName].isnull().sum()) + logger.info(f"Post-filtering tag label breakdown {ratings.groupby(labelColName).size()}") + logger.info(f"Number of rows with no tag label {ratings[labelColName].isnull().sum()}") ratings[labelColName] = ratings[labelColName].astype(int) diff --git a/sourcecode/scoring/tag_filter.py b/sourcecode/scoring/tag_filter.py index 1b2c745a..c09704ac 100644 --- a/sourcecode/scoring/tag_filter.py +++ b/sourcecode/scoring/tag_filter.py @@ -1,5 +1,6 @@ """Utilites for tag based scoring logic.""" +import logging from typing import Dict from . import constants as c @@ -8,6 +9,10 @@ import pandas as pd +logger = logging.getLogger("birdwatch.tag_filter") +logger.setLevel(logging.INFO) + + def _normalize_factors(rawFactors: pd.DataFrame, entityKey: str, factorKey: str) -> pd.DataFrame: """Performs Z-Normalization on embedding factors. @@ -133,7 +138,7 @@ def get_tag_thresholds(ratings: pd.DataFrame, percentile: int) -> Dict[str, floa thresholds = {} for column in c.notHelpfulTagsAdjustedRatioColumns: if len(ratings[column]) == 0: - print( + logger.info( f"Warning: No ratings for column {column} in get_tag_thresholds. Setting threshold to 0.0 arbitrarily." ) thresholds[column] = 0.0 diff --git a/sourcecode/scoring/topic_model.py b/sourcecode/scoring/topic_model.py index c14f78d2..c7edb9be 100644 --- a/sourcecode/scoring/topic_model.py +++ b/sourcecode/scoring/topic_model.py @@ -9,6 +9,7 @@ evaluates the efficacy of per-topic note scoring. """ +import logging import re from typing import List, Optional, Tuple @@ -23,6 +24,10 @@ from sklearn.pipeline import Pipeline +logger = logging.getLogger("birdwatch.topic_model") +logger.setLevel(logging.INFO) + + class TopicModel(object): def __init__(self): """Initialize a list of seed terms for each topic.""" @@ -85,7 +90,7 @@ def _make_seed_labels(self, texts: np.ndarray) -> Tuple[np.ndarray, np.ndarray]: conflictedTexts[i] = True unassigned_count = np.sum(conflictedTexts) - print(f" Notes unassigned due to multiple matches: {unassigned_count}") + logger.info(f" Notes unassigned due to multiple matches: {unassigned_count}") return labels, conflictedTexts def _get_stop_words(self, texts: np.ndarray) -> List[str]: @@ -105,15 +110,15 @@ def _get_stop_words(self, texts: np.ndarray) -> List[str]: cv = CountVectorizer(strip_accents="unicode") cv.fit(texts) rawVocabulary = cv.vocabulary_.keys() - print(f" Initial vocabulary length: {len(rawVocabulary)}") + logger.info(f" Initial vocabulary length: {len(rawVocabulary)}") # Identify stop words blockedTokens = set() for terms in self._seedTerms.values(): # Remove whitespace and any escaped characters from terms blockedTokens |= {re.sub(r"\\.", "", t.strip()) for t in terms} - print(f" Total tokens to filter: {len(blockedTokens)}") + logger.info(f" Total tokens to filter: {len(blockedTokens)}") stopWords = [v for v in rawVocabulary if any(t in v for t in blockedTokens)] - print(f" Total identified stopwords: {len(stopWords)}") + logger.info(f" Total identified stopwords: {len(stopWords)}") return stopWords def _merge_predictions_and_labels( @@ -208,11 +213,11 @@ def get_note_topics( Args: notes: DF containing all notes to potentially assign to a topic """ - print("Assigning notes to topics:") + logger.info("Assigning notes to topics:") if noteTopicClassifier is not None: pipe = noteTopicClassifier else: - print("Training note topic classifier") + logger.info("Training note topic classifier") pipe, seedLabels, conflictedTextsForAccuracyEval = self.train_note_topic_classifier(notes) postText = self._prepare_post_text(notes) @@ -234,7 +239,7 @@ def get_note_topics( with c.time_block("Get Note Topics: Merge and assign predictions"): pred = self._merge_predictions_and_labels(pred, seedLabels) - print(f" Topic assignment results: {np.bincount(pred)}") + logger.info(f" Topic assignment results: {np.bincount(pred)}") # Assign topics to notes based on aggregated note text, and drop any # notes on posts that were unassigned. @@ -247,7 +252,7 @@ def get_note_topics( def validate_note_topic_accuracy_on_seed_labels(self, pred, seedLabels, conflictedTexts): balancedAccuracy = balanced_accuracy_score(seedLabels[~conflictedTexts], pred[~conflictedTexts]) - print(f" Balanced accuracy on raw predictions: {balancedAccuracy}") + logger.info(f" Balanced accuracy on raw predictions: {balancedAccuracy}") assert balancedAccuracy > 0.5, f"Balanced accuracy too low: {balancedAccuracy}" # Validate that any conflicted text is Unassigned in seedLabels assert all(seedLabels[conflictedTexts] == Topics.Unassigned.value)