diff --git a/README.md b/README.md index 5e6d3238..3cb4511a 100644 --- a/README.md +++ b/README.md @@ -21,7 +21,7 @@ python src/psycopt2d/train_model.py --config-name test_config.yaml +model=xgboos To test new integrations with WandB: ```python -python src/psycopt2d/train_model.py +model=xgboost project.wandb_mode="run" --config-name integration_testing.yaml +python src/psycopt2d/train_model.py +model=xgboost project.wandb.mode="run" --config-name integration_testing.yaml ``` diff --git a/application/train_and_log_models.py b/application/train_and_log_models.py new file mode 100644 index 00000000..c5b5e226 --- /dev/null +++ b/application/train_and_log_models.py @@ -0,0 +1,270 @@ +"""Example script to train multiple models and subsequently log the results to +wandb. + +Usage: +- Replace the HYDRA_ARGS string with the desired arguments for `train_model.py` +- Run this script from project root with `python src/psycopt2d/train_and_log_models.py +""" +import random +import subprocess +import time +from pathlib import Path +from typing import Union + +import pandas as pd +from hydra import compose, initialize +from pydantic import BaseModel +from wasabi import Printer + +from psycopt2d.evaluate_saved_model_predictions import ( + infer_look_distance, + infer_outcome_col_name, + infer_predictor_col_name, +) +from psycopt2d.utils.configs import FullConfig, omegaconf_to_pydantic_objects + +msg = Printer(timestamp=True) + + +class LookDistance(BaseModel): + """A distance of ahead and behind.""" + + behind_days: Union[int, float] + ahead_days: Union[int, float] + + +def load_train_raw(cfg: FullConfig): + """Load the data.""" + path = Path(cfg.data.dir) + file_names = list(path.glob(pattern=r"*train*")) + + if len(file_names) == 1: + file_name = file_names[0] + file_suffix = file_name.suffix + if file_suffix == ".parquet": + return pd.read_parquet(file_name) + elif file_suffix == ".csv": + return pd.read_csv(file_name) + + raise ValueError(f"Returned {len(file_names)} files") + + +def infer_possible_look_distances(df: pd.DataFrame) -> list[LookDistance]: + """Infer the possible values for min_lookahead_days and + min_lookbehind_days.""" + # Get potential lookaheads from outc_ columns + outcome_col_names = infer_outcome_col_name(df=df, allow_multiple=True) + + possible_lookahead_days = infer_look_distance(col_name=outcome_col_names) + + # Get potential lookbehinds from pred_ columns + pred_col_names = infer_predictor_col_name(df=df, allow_multiple=True) + possible_lookbehind_days = list(set(infer_look_distance(col_name=pred_col_names))) + + return [ + LookDistance( + behind_days=lookbehind_days, + ahead_days=lookahead_days, + ) + for lookahead_days in possible_lookahead_days + for lookbehind_days in possible_lookbehind_days + ] + + +def start_trainer( + cfg: FullConfig, + config_file_name: str, + cell: LookDistance, + wandb_group_override: str, +) -> subprocess.Popen: + """Start a trainer.""" + subprocess_args: list[str] = [ + "python", + "src/psycopt2d/train_model.py", + f"model={cfg.model.model_name}", + f"data.min_lookbehind_days={max(cfg.data.lookbehind_combination)}", + f"data.min_lookahead_days={cell.ahead_days}", + f"project.wandb.group='{wandb_group_override}'", + f"hydra.sweeper.n_trials={cfg.train.n_trials_per_lookdirection_combination}", + f"project.wandb.mode={cfg.project.wandb.mode}", + "--config-name", + f"{config_file_name}", + ] + + if cfg.train.n_trials_per_lookdirection_combination > 1: + subprocess_args.insert(2, "--multirun") + + if cfg.model.model_name == "xgboost" and not cfg.train.gpu: + subprocess_args.insert(3, "++model.args.tree_method='auto'") + + msg.info(f'{" ".join(subprocess_args)}') + + return subprocess.Popen( # pylint: disable=consider-using-with + args=subprocess_args, + ) + + +def start_watcher(cfg: FullConfig) -> subprocess.Popen: + """Start a watcher.""" + return subprocess.Popen( # pylint: disable=consider-using-with + [ + "python", + "src/psycopt2d/model_training_watcher.py", + "--entity", + cfg.project.wandb.entity, + "--project_name", + cfg.project.name, + "--n_runs_before_eval", + str(cfg.project.watcher.n_runs_before_eval), + "--overtaci", + str(cfg.eval.save_model_predictions_on_overtaci), + "--timeout", + "None", + "--clean_wandb_dir", + str(cfg.project.watcher.archive_all), + "--verbose", + "True", + ], + ) + + +def train_models_for_each_cell_in_grid( + cfg: FullConfig, + possible_look_distances: list[LookDistance], + config_file_name: str, +): + """Train a model for each cell in the grid of possible look distances.""" + from random_word import RandomWords + + random_word = RandomWords() + + random.shuffle(possible_look_distances) + + active_trainers: list[subprocess.Popen] = [] + + wandb_prefix = f"{random_word.get_random_word()}-{random_word.get_random_word()}" + + while possible_look_distances or active_trainers: + # Wait until there is a free slot in the trainers group + if len(active_trainers) >= cfg.train.n_active_trainers: + # Drop trainers if they have finished + # If finished, t.poll() is not None + active_trainers = [t for t in active_trainers if t.poll() is None] + time.sleep(1) + continue + + # Start a new trainer + + combination = possible_look_distances.pop() + + msg.info( + f"Spawning a new trainer with lookbehind={combination.behind_days} and lookahead={combination.ahead_days}", + ) + wandb_group = f"{wandb_prefix}" + + active_trainers.append( + start_trainer( + cfg=cfg, + config_file_name=config_file_name, + cell=combination, + wandb_group_override=wandb_group, + ), + ) + + +def load_cfg(config_file_name) -> FullConfig: + """Load config as pydantic object.""" + with initialize(version_base=None, config_path="../src/psycopt2d/config/"): + cfg = compose( + config_name=config_file_name, + ) + + cfg = omegaconf_to_pydantic_objects(cfg) + return cfg + + +def get_possible_look_distances( + msg: Printer, + cfg: FullConfig, + train: pd.DataFrame, +) -> list[LookDistance]: + """Some look_ahead and look_behind distances will result in 0 valid + prediction times. Only return combinations which will allow some prediction + times. + + E.g. if we only have 4 years of data: + - min_lookahead = 2 years + - min_lookbehind = 3 years + + Will mean that no rows satisfy the criteria. + """ + + look_combinations_in_dataset = infer_possible_look_distances(df=train) + + # Don't try look distance combinations which will result in 0 rows + max_distance_in_dataset_days = ( + max(train[cfg.data.pred_timestamp_col_name]) + - min( + train[cfg.data.pred_timestamp_col_name], + ) + ).days + + look_combinations_without_rows = [ + dist + for dist in look_combinations_in_dataset + if (dist.ahead_days + dist.behind_days) > max_distance_in_dataset_days + ] + + msg.info( + f"Not fitting model to {look_combinations_without_rows}, since no rows satisfy the criteria.", + ) + + look_combinations_with_rows = [ + dist + for dist in look_combinations_in_dataset + if ((dist.ahead_days + dist.behind_days) < max_distance_in_dataset_days) + ] + + return look_combinations_with_rows + + +def main(): + """Main.""" + msg = Printer(timestamp=True) + + config_file_name = "default_config.yaml" + + cfg = load_cfg(config_file_name=config_file_name) + + # Load dataset without dropping any rows for inferring + # which look distances to grid search over + train = load_train_raw(cfg=cfg) + possible_look_distances = get_possible_look_distances(msg, cfg, train) + + if not cfg.train.gpu: + msg.warn("Not using GPU for training") + + if cfg.project.wandb.mode == "run": + msg.warn( + f"wandb.mode is {cfg.project.wandb.mode}, not using the watcher. This will substantially slow down training.", + ) + else: + watcher = start_watcher(cfg=cfg) + + train_models_for_each_cell_in_grid( + cfg=cfg, + possible_look_distances=possible_look_distances, + config_file_name=config_file_name, + ) + + if cfg.project.wand.mode != "run": + msg.good( + f"Training finished. Stopping the watcher in {cfg.project.watcher.keep_alive_after_training_minutes} minutes...", + ) + + time.sleep(60 * cfg.project.watcher.keep_alive_after_training_minutes) + watcher.kill() + + +if __name__ == "__main__": + main() diff --git a/docs/conf.py b/docs/conf.py index b420ec84..e898c604 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -24,7 +24,7 @@ # Add any Sphinx extension module names here, as strings. They can be extensions # coming with Sphinx (named 'sphinx.ext.*') or your custom ones. -extensions = [] +extensions = [] # type: ignore # Add any paths that contain templates here, relative to this directory. templates_path = ["_templates"] @@ -167,7 +167,7 @@ # -- Options for LaTeX output -------------------------------------------------- -latex_elements = { +latex_elements = { # type: ignore # The paper size ('letterpaper' or 'a4paper'). # 'papersize': 'letterpaper', # The font size ('10pt', '11pt' or '12pt'). diff --git a/pyproject.toml b/pyproject.toml index f899117f..be1f23bb 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -25,6 +25,7 @@ selenium = ">=4.2.0,<4.6.0" # See https://github.com/Aarhus-Psychiatry-Research/psycop-t2d/pull/194 for thoughts on root cause seaborn = ">=0.12.0,<0.12.2" pyarrow = ">=9.0.0, <9.1.0" +Random-Word = "^1.0.11" [tool.poetry.dev-dependencies] diff --git a/reports/render_report.py b/reports/render_report.py index e62d906f..0204127e 100644 --- a/reports/render_report.py +++ b/reports/render_report.py @@ -9,7 +9,7 @@ import pandas as pd -from psycopt2d.utils import PROJECT_ROOT +from psycopt2d.utils.utils import PROJECT_ROOT # import pandoc # See comment in pyproject.toml on Pandoc, not currently in use. Should work now, see: https://github.com/boisgera/pandoc/pull/49#issuecomment-1265983279 diff --git a/src/psycopt2d/config/data/synth_data.yaml b/src/psycopt2d/config/data/synth_data.yaml index 316235ae..676acce5 100644 --- a/src/psycopt2d/config/data/synth_data.yaml +++ b/src/psycopt2d/config/data/synth_data.yaml @@ -1,20 +1,26 @@ # @package _global_ data: + dir: tests/test_data/synth_splits + suffix: csv n_training_samples: null - min_lookahead_days: null + min_lookahead_days: 30 + min_lookbehind_days: 100 min_prediction_time_date: null - lookahead_days: 30 - pred_col_name_prefix: "pred_" + pred_col_name_prefix: pred_ pred_timestamp_col_name: timestamp outcome_timestamp_col_name: timestamp_outcome id_col_name: citizen_ids - source: synthetic - min_lookbehind_days: null + + # Looking ahead + lookahead_days: 30 drop_patient_if_outcome_before_date: null - lookbehind_combination: null + + # Looking behind + max_lookbehind_days: 1850 + lookbehind_combination: [30, 60, 100] # Parameters that will only take effect if running with --multirun hydra: sweeper: params: - ++data.lookbehind_combinations: choice([30, 90], [30]) + data.lookbehind_combination: choice([30, 90], [30]) diff --git a/src/psycopt2d/config/data/t2d_parquet.yaml b/src/psycopt2d/config/data/t2d_parquet.yaml index cd59b629..6f3d394a 100644 --- a/src/psycopt2d/config/data/t2d_parquet.yaml +++ b/src/psycopt2d/config/data/t2d_parquet.yaml @@ -1,27 +1,28 @@ # @package _global_ data: # General config - n_training_samples: null # (int, null): Number of training samples to use, defaults to null in which cases it uses all samples. - dir: E:\shared_resources\feature_sets\t2d\feature_sets\psycop_t2d_adminmanber_201_features_2022_10_05_15_14 - source: parquet # Where to load data from. Takes "sql" or "synthetic" + n_training_samples: null + dir: E:\shared_resources\feature_sets\t2d\feature_sets\psycop_t2d_adminmanber_416_features_2022_10_20_11_12 + suffix: parquet - # Feature specs - pred_col_name_prefix: "pred_" # (str): prefix of predictor columns - pred_timestamp_col_name: timestamp # (str): Column name for prediction times - outcome_timestamp_col_name: _timestamp_first_t2d # (str): Column name for outcome timestamps - id_col_name: dw_ek_borger # (str): Citizen colnames - - # Looking ahead - lookahead_days: 1825 # (float): Number of days from prediction time to look ahead for the outcome. - min_lookahead_days: 1825 # (int): Drop all prediction times where (max timestamp in the dataset) - (current timestamp) is less than min_lookahead_days + # Patient exclusion criteria drop_patient_if_outcome_before_date: 2013-01-01 - # Looking behind + # Prediction time exclusion criteria min_prediction_time_date: 2013-01-01 - min_lookbehind_days: 365 # (int): Drop all prediction times where (prediction_timestamp) - (min timestamp in the dataset) is less than min_lookbehind_days + min_lookbehind_days: 730 + min_lookahead_days: 1825 + + # Feature specs + pred_col_name_prefix: "pred_" + pred_timestamp_col_name: timestamp + outcome_timestamp_col_name: _timestamp_first_t2d + id_col_name: dw_ek_borger + max_lookbehind_days: 3650 + lookbehind_combination: [30, 90, 180, 365, 730] # Parameters that will only take effect if running with --multirun hydra: sweeper: params: - ++data.lookbehind_combinations: choice([30, 90], [30]) + ++data.lookbehind_combination: choice([30, 90, 180, 365, 730], [30, 180, 730], [730], [365], [90], [30]) diff --git a/src/psycopt2d/config/default_config.yaml b/src/psycopt2d/config/default_config.yaml index 5e318e27..c62edfc1 100644 --- a/src/psycopt2d/config/default_config.yaml +++ b/src/psycopt2d/config/default_config.yaml @@ -1,8 +1,9 @@ # @package _global_ defaults: - - project: overtaci_test_project + - project: default_project - data: t2d_parquet - preprocessing: default_preprocessing - - training: default_training - - evaluation: default_evaluation - - sweeper: optuna_singlethread \ No newline at end of file + - model: xgboost + - train: default_training + - eval: default_evaluation + - sweeper: optuna_singlethread diff --git a/src/psycopt2d/config/evaluation/default_evaluation.yaml b/src/psycopt2d/config/eval/default_evaluation.yaml similarity index 100% rename from src/psycopt2d/config/evaluation/default_evaluation.yaml rename to src/psycopt2d/config/eval/default_evaluation.yaml diff --git a/src/psycopt2d/config/evaluation/evaluation_synth.yaml b/src/psycopt2d/config/eval/evaluation_synth.yaml similarity index 100% rename from src/psycopt2d/config/evaluation/evaluation_synth.yaml rename to src/psycopt2d/config/eval/evaluation_synth.yaml diff --git a/src/psycopt2d/config/integration_testing.yaml b/src/psycopt2d/config/integration_testing.yaml index 5a3c6d52..6b860e1b 100644 --- a/src/psycopt2d/config/integration_testing.yaml +++ b/src/psycopt2d/config/integration_testing.yaml @@ -3,6 +3,7 @@ defaults: - project: integration_test_project - data: synth_data - preprocessing: default_preprocessing - - training: default_training - - evaluation: evaluation_synth + - train: default_training + - model: xgboost + - eval: evaluation_synth - sweeper: optuna_singlethread diff --git a/src/psycopt2d/config/overtaci_testing.yaml b/src/psycopt2d/config/overtaci_testing.yaml deleted file mode 100644 index 39aca780..00000000 --- a/src/psycopt2d/config/overtaci_testing.yaml +++ /dev/null @@ -1,8 +0,0 @@ -# @package _global_ -defaults: - - project: overtaci_test_project - - data: t2d_parquet - - preprocessing: default_preprocessing - - training: default_training - - evaluation: default_evaluation - - sweeper: optuna_singlethread diff --git a/src/psycopt2d/config/preprocessing/default_preprocessing.yaml b/src/psycopt2d/config/preprocessing/default_preprocessing.yaml index 0d33340a..1197da95 100644 --- a/src/psycopt2d/config/preprocessing/default_preprocessing.yaml +++ b/src/psycopt2d/config/preprocessing/default_preprocessing.yaml @@ -2,6 +2,7 @@ convert_to_boolean: False # (Boolean): Convert all prediction values (except gen convert_datetimes_to: False # (str): Options include ordinal or False imputation_method: "most_frequent" # (str): Options include 2most_frequent" transform: null # (str|null): Transformation applied to all predictors after imputation. Options include "z-score-normalization" -feature_selection_method: null -feature_selection_params: - percentile: 10 # (int): Percent of features to keep. Defaults to 10. +feature_selection: + name: null + params: + percentile: 10 # (int): Percent of features to keep. Defaults to 10. diff --git a/src/psycopt2d/config/project/default_project.yaml b/src/psycopt2d/config/project/default_project.yaml index a40240dc..60ef0af8 100644 --- a/src/psycopt2d/config/project/default_project.yaml +++ b/src/psycopt2d/config/project/default_project.yaml @@ -1,4 +1,13 @@ name: psycop-t2d seed: 42 -wandb_mode: "run" # Which mode to run WanDB in. Takes "run", "dryrun", "offline" and "disabled" -wandb_entity: "psycop" # Optional[str] + +wandb: + entity: "psycop" # Which entity to run WanDB in. + mode: "offline" # Which mode to run WanDB in. Takes "run", "dryrun", "offline" and "disabled" + group: "psycop-t2d" # Which group to run WanDB in. + +watcher: + archive_all: false + keep_alive_after_training_minutes: 5 + n_runs_before_eval: 1 + verbose: true diff --git a/src/psycopt2d/config/project/integration_test_project.yaml b/src/psycopt2d/config/project/integration_test_project.yaml index e6ad4b7a..dceda704 100644 --- a/src/psycopt2d/config/project/integration_test_project.yaml +++ b/src/psycopt2d/config/project/integration_test_project.yaml @@ -1,4 +1,12 @@ name: psycop-t2d-integration-testing seed: 42 -wandb_mode: "disabled" # Which mode to run WanDB in. Takes "run", "dryrun", "offline" and "disabled" -wandb_entity: "psycop" # Optional[str] +wandb: + mode: "offline" # Which mode to run WanDB in. Takes "run", "dryrun", and "disabled" + group: "integration_testing" + entity: "psycop-t2d-testing" # Which entity to run WanDB in. +watcher: + archive_all: true + keep_alive_after_training_minutes: 5 + n_runs_before_eval: 1 + verbose: true +gpu: false diff --git a/src/psycopt2d/config/project/overtaci_test_project.yaml b/src/psycopt2d/config/project/overtaci_test_project.yaml index 373e19af..716c9943 100644 --- a/src/psycopt2d/config/project/overtaci_test_project.yaml +++ b/src/psycopt2d/config/project/overtaci_test_project.yaml @@ -1,4 +1,5 @@ name: psycop-t2d-testing seed: 42 -wandb_mode: "run" # Which mode to run WanDB in. Takes "run", "dryrun", "offline" and "disabled" +wandb_mode: "offline" # Which mode to run WanDB in. Takes "run", "dryrun", "offline" and "disabled" +wandb_group: "psycop-t2d" wandb_entity: "psycop" # Optional[str] diff --git a/src/psycopt2d/config/sweep_xgboost.yaml b/src/psycopt2d/config/sweep_xgboost.yaml deleted file mode 100644 index 0295eb9a..00000000 --- a/src/psycopt2d/config/sweep_xgboost.yaml +++ /dev/null @@ -1,8 +0,0 @@ -# @package _global_ -defaults: - - project: default_project - - data: all_csv - - preprocessing: default_preprocessing - - training: default_training - - evaluation: default_evaluation - - sweeper: optuna_singlethread \ No newline at end of file diff --git a/src/psycopt2d/config/training/default_training.yaml b/src/psycopt2d/config/train/default_training.yaml similarity index 55% rename from src/psycopt2d/config/training/default_training.yaml rename to src/psycopt2d/config/train/default_training.yaml index 932506fc..f5378cb6 100644 --- a/src/psycopt2d/config/training/default_training.yaml +++ b/src/psycopt2d/config/train/default_training.yaml @@ -1 +1,4 @@ n_splits: 3 # (int, Null): Number of k-folds during CV. If Null, loads pre-defined dataset. +n_trials_per_lookdirection_combination: 20 +n_active_trainers: 1 +gpu: true diff --git a/src/psycopt2d/dataclasses/configs.py b/src/psycopt2d/configs.py similarity index 85% rename from src/psycopt2d/dataclasses/configs.py rename to src/psycopt2d/configs.py index e809d072..3e4beb6c 100644 --- a/src/psycopt2d/dataclasses/configs.py +++ b/src/psycopt2d/configs.py @@ -2,9 +2,10 @@ from typing import Optional import pandas as pd -from omegaconf import DictConfig from pydantic import BaseModel +from psycopt2d.utils.configs import FullConfig + # pylint: disable=missing-class-docstring, too-few-public-methods @@ -15,5 +16,5 @@ class Config: arbitrary_types_allowed = True df: pd.DataFrame - cfg: DictConfig + cfg: FullConfig feature_importance_dict: Optional[dict[str, float]] = None diff --git a/src/psycopt2d/evaluate_saved_model_predictions.py b/src/psycopt2d/evaluate_saved_model_predictions.py index 87cce112..2b312724 100644 --- a/src/psycopt2d/evaluate_saved_model_predictions.py +++ b/src/psycopt2d/evaluate_saved_model_predictions.py @@ -6,22 +6,73 @@ - Evaluate all models in 'evaluation_results' folder - CLI for evaluating a model """ +import re +from collections.abc import Iterable +from pathlib import Path +from typing import Union -from psycopt2d.utils import ( +import pandas as pd +from omegaconf import DictConfig + +from psycopt2d.utils.utils import ( PROJECT_ROOT, infer_outcome_col_name, + infer_predictor_col_name, infer_y_hat_prob_col_name, load_evaluation_data, + read_pickle, ) from psycopt2d.visualization import plot_auc_by_time_from_first_visit + +def infer_look_distance( + col_name: Union[Iterable[str], str], + regex_pattern: str = r"within_(\d+)_days", + allow_multiple: bool = True, +) -> list[str]: + """Infer look distances from col names.""" + # E.g. "outc_within_1_days" = 1 + # E.g. "outc_within_2_days" = 2 + # E.g. "pred_within_3_days" = 3 + # E.g. "pred_within_3_days" = 3 + + look_distances: list[str] = [] + + if isinstance(col_name, Iterable) and not isinstance(col_name, str): + for c_name in col_name: + look_distances += infer_look_distance( + col_name=c_name, + regex_pattern=regex_pattern, + ) + else: + look_distances = re.findall(pattern=regex_pattern, string=col_name) + + if len(look_distances) > 1 and not allow_multiple: + raise ValueError( + f"Multiple col names provided and allow_multiple is {allow_multiple}.", + ) + + return look_distances + + +def load_model_predictions_and_cfg(path: Path) -> tuple[pd.DataFrame, DictConfig]: + """Load model predictions and config file from a pickle file. + + Args: + path (Path): Path to pickle file + """ + obj = read_pickle(path) + return obj["df"], obj["cfg"] + + if __name__ == "__main__": eval_path = PROJECT_ROOT / "evaluation_results" # change to whatever modele you wish to evaluate eval_data = load_evaluation_data(eval_path / "2022_10_18_13_23_2h3cxref") - y_col_name = infer_outcome_col_name(eval_data.df) + train_col_names = infer_predictor_col_name(df=eval_data.df) + y_col_name = infer_outcome_col_name(df=eval_data.df) y_hat_prob_name = infer_y_hat_prob_col_name(eval_data.df) diff --git a/src/psycopt2d/evaluation.py b/src/psycopt2d/evaluation.py index b57e9c3f..003c959e 100644 --- a/src/psycopt2d/evaluation.py +++ b/src/psycopt2d/evaluation.py @@ -7,7 +7,6 @@ import pandas as pd from omegaconf.dictconfig import DictConfig from sklearn.metrics import recall_score, roc_auc_score -from sklearn.pipeline import Pipeline from wandb.sdk.wandb_run import Run as wandb_run # pylint: disable=no-name-in-module from wasabi import Printer @@ -15,8 +14,8 @@ from psycopt2d.tables.performance_by_threshold import ( generate_performance_by_positive_rate_table, ) -from psycopt2d.tables.tables import feature_selection_table -from psycopt2d.utils import PROJECT_ROOT, positive_rate_to_pred_probs +from psycopt2d.utils.configs import FullConfig +from psycopt2d.utils.utils import PROJECT_ROOT, positive_rate_to_pred_probs from psycopt2d.visualization import ( plot_auc_by_time_from_first_visit, plot_feature_importances, @@ -39,7 +38,7 @@ def log_feature_importances( feature_importance_plot_path = plot_feature_importances( feature_names=feature_importance_dict.keys(), feature_importances=feature_importance_dict.values(), - top_n_feature_importances=cfg.evaluation.top_n_feature_importances, + top_n_feature_importances=cfg.eval.top_n_feature_importances, save_path=save_path, ) @@ -55,24 +54,14 @@ def log_feature_importances( def evaluate_model( - cfg, + cfg: FullConfig, eval_df: pd.DataFrame, - pipe: Pipeline, y_col_name: str, - train_col_names: Iterable[str], y_hat_prob_col_name: str, run: wandb_run, feature_importance_dict: Optional[dict[str, float]], ) -> None: """Runs the evaluation suite on the model and logs to WandB. - At present, this includes: - 1. AUC - 2. Table of performance by pred_proba threshold - 3. Feature importance - 4. Sensitivity by time to outcome - 5. AUC by calendar time - 6. AUC by time from first visit - 7. F1 by time until diagnosis Args: cfg (OmegaConf): The hydra config from the run @@ -84,13 +73,16 @@ def evaluate_model( run (wandb_run): WandB run to log to. feature_importance_dict (Optional[dict[str, float]]): Dict of feature names and their importance. If None, will not log feature importance. + selected_features (Optional[list[str]]): List of selected features after preprocessing. + Used for plotting. """ msg = Printer(timestamp=True) msg.info("Starting model evaluation") SAVE_DIR = PROJECT_ROOT / ".tmp" # pylint: disable=invalid-name - # When parallelising tests, this causes issues since multiple processes + # When running tests in parallel with pytest-xdist, + # this causes issues since multiple processes # override the same dir at once. # Can be solved by allowing config to override this # and using tmp_dir in pytest. Not worth refactoring @@ -107,34 +99,18 @@ def evaluate_model( pred_timestamps = eval_df[cfg.data.pred_timestamp_col_name] y_hat_int = np.round(y_hat_probs, 0) - if "feature_selection" in pipe["preprocessing"].named_steps: - selected_features = ( - eval_df[train_col_names] - .columns[pipe["preprocessing"]["feature_selection"].get_support()] - .to_list() - ) - - run.log( - { - "feature_selection_table": feature_selection_table( - feature_names=train_col_names, - selected_feature_names=selected_features, - ), - }, - ) - - date_bins_ahead: Iterable[int] = cfg.evaluation.date_bins_ahead - date_bins_behind: Iterable[int] = cfg.evaluation.date_bins_behind + date_bins_ahead: Iterable[int] = cfg.eval.date_bins_ahead + date_bins_behind: Iterable[int] = cfg.eval.date_bins_behind # Drop date_bins_direction if they are further away than min_lookdirection_days if cfg.data.min_lookbehind_days: date_bins_behind = [ - b for b in date_bins_behind if cfg.data.min_lookbehind_days < b + b for b in date_bins_behind if cfg.data.min_lookbehind_days > b ] if cfg.data.min_lookahead_days: date_bins_ahead = [ - b for b in date_bins_ahead if cfg.data.min_lookahead_days < abs(b) + b for b in date_bins_ahead if cfg.data.min_lookahead_days > abs(b) ] # Invert date_bins_behind to negative if it's not already @@ -150,7 +126,7 @@ def evaluate_model( pred_proba_thresholds = positive_rate_to_pred_probs( pred_probs=y_hat_probs, - positive_rate_thresholds=cfg.evaluation.positive_rate_thresholds, + positive_rate_thresholds=cfg.eval.positive_rate_thresholds, ) msg.info(f"AUC: {auc}") @@ -165,7 +141,7 @@ def evaluate_model( performance_by_threshold_df = generate_performance_by_positive_rate_table( labels=y, pred_probs=y_hat_probs, - positive_rate_thresholds=cfg.evaluation.positive_rate_thresholds, + positive_rate_thresholds=cfg.eval.positive_rate_thresholds, pred_proba_thresholds=pred_proba_thresholds, ids=eval_df[cfg.data.id_col_name], pred_timestamps=pred_timestamps, diff --git a/src/psycopt2d/load.py b/src/psycopt2d/load.py index 65d4c34c..41958499 100644 --- a/src/psycopt2d/load.py +++ b/src/psycopt2d/load.py @@ -1,70 +1,27 @@ """Loader for the t2d dataset.""" +import os import re from collections.abc import Iterable -from datetime import datetime, timedelta +from datetime import timedelta from pathlib import Path -from typing import Any, Optional, Union +from typing import Optional, Union import pandas as pd -from omegaconf import DictConfig, OmegaConf from psycopmlutils.sql.loader import sql_load -from pydantic import BaseModel, Field +from pydantic import BaseModel from wasabi import Printer -from psycopt2d.utils import PROJECT_ROOT, coerce_to_datetime +from psycopt2d.evaluate_saved_model_predictions import infer_look_distance +from psycopt2d.utils.configs import FullConfig +from psycopt2d.utils.utils import ( + get_percent_lost, + infer_outcome_col_name, + infer_predictor_col_name, +) msg = Printer(timestamp=True) -class DatasetTimeSpecification(BaseModel): - """Specification of the time range of the dataset.""" - - drop_patient_if_outcome_before_date: Optional[Union[str, datetime]] = Field( - description="""If a patient experiences the outcome before this date, all their prediction times will be dropped. - Used for wash-in, to avoid including patients who were probably already experiencing the outcome before the study began.""", - ) - - min_prediction_time_date: Optional[Union[str, datetime]] = Field( - description="""Any prediction time before this date will be dropped.""", - ) - - min_lookbehind_days: Optional[Union[int, float]] = Field( - description="""If the distance from the prediction time to the start of the dataset is less than this, the prediction time will be dropped""", - ) - - min_lookahead_days: Optional[Union[int, float]] = Field( - description="""If the distance from the prediction time to the end of the dataset is less than this, the prediction time will be dropped""", - ) - - lookbehind_combination: Optional[list[Union[int, float]]] = Field( - description="""List containing a combination of lookbehind windows (e.g. [30, 60, 90]) which determines which features to keep in the dataset. E.g. for the above list, only features with lookbehinds of 30, 60 or 90 days will be kept.""", - ) - - -class DatasetSpecification(BaseModel): - """Specification for loading a dataset.""" - - split_dir_path: Union[str, Path] = Field( - description="""Path to the directory containing the split files.""", - ) - - file_suffix: str = Field( - description="""Suffix of the split files. E.g. 'parquet' or 'csv'.""", - default="parquet", - ) - - time: DatasetTimeSpecification - - pred_col_name_prefix: str = Field( - default="pred_", - description="""Prefix for the prediction column names.""", - ) - pred_time_colname: str = Field( - default="timestamp", - description="""Column name for with timestamps for prediction times""", - ) - - def load_timestamp_for_any_diabetes(): """Loads timestamps for the broad definition of diabetes used for wash-in. @@ -105,16 +62,16 @@ class DataLoader: def __init__( self, - spec: DatasetSpecification, + cfg: FullConfig, ): - self.spec = spec + self.cfg = cfg # File handling - self.dir_path = Path(spec.split_dir_path) - self.file_suffix = spec.file_suffix + self.dir_path = Path(cfg.data.dir) + self.file_suffix = cfg.data.suffix # Column specifications - self.pred_col_name_prefix = spec.pred_col_name_prefix + self.pred_col_name_prefix = cfg.data.pred_col_name_prefix def _load_dataset_file( # pylint: disable=inconsistent-return-statements self, @@ -168,7 +125,7 @@ def _drop_rows_if_datasets_ends_within_days( pd.DataFrame: Dataset with dropped rows. """ if not isinstance(n_days, timedelta): - n_days = timedelta(days=n_days) # type: ignore + n_days_timedelt: timedelta = timedelta(days=n_days) # type: ignore if direction not in ("ahead", "behind"): raise ValueError(f"Direction {direction} not supported.") @@ -176,54 +133,61 @@ def _drop_rows_if_datasets_ends_within_days( n_rows_before_modification = dataset.shape[0] if direction == "ahead": - max_datetime = dataset[self.spec.pred_time_colname].max() - n_days - before_max_dt = dataset[self.spec.pred_time_colname] < max_datetime + max_datetime = ( + dataset[self.cfg.data.pred_timestamp_col_name].max() - n_days_timedelt + ) + before_max_dt = ( + dataset[self.cfg.data.pred_timestamp_col_name] < max_datetime + ) dataset = dataset[before_max_dt] elif direction == "behind": - min_datetime = dataset[self.spec.pred_time_colname].min() + n_days - after_min_dt = dataset[self.spec.pred_time_colname] > min_datetime + min_datetime = ( + dataset[self.cfg.data.pred_timestamp_col_name].min() + n_days_timedelt + ) + after_min_dt = dataset[self.cfg.data.pred_timestamp_col_name] > min_datetime dataset = dataset[after_min_dt] n_rows_after_modification = dataset.shape[0] - percent_dropped = ( - n_rows_before_modification - n_rows_after_modification - ) / n_rows_before_modification - - msg.info( - f"Dropped {n_rows_before_modification - n_rows_after_modification} ({percent_dropped}%) rows because the end of the dataset was within {n_days} of their prediction time when looking {direction} from their prediction time", + percent_dropped = get_percent_lost( + n_before=n_rows_before_modification, + n_after=n_rows_after_modification, ) + if n_rows_before_modification - n_rows_after_modification != 0: + msg.info( + f"Dropped {n_rows_before_modification - n_rows_after_modification} ({percent_dropped}%) rows because the end of the dataset was within {n_days} of their prediction time when looking {direction} from their prediction time", + ) + return dataset - def _drop_patients_with_event_in_washin(self, dataset) -> pd.DataFrame: + def drop_patient_if_outcome_before_date( + self, + dataset: pd.DataFrame, + ) -> pd.DataFrame: """Drop patients within washin period.""" n_rows_before_modification = dataset.shape[0] - # Remove dates before drop_patient_if_outcome_before_date outcome_before_date = ( - dataset["timestamp_first_diabetes_any"] - < self.spec.time.drop_patient_if_outcome_before_date + dataset[self.cfg.data.outcome_timestamp_col_name] + < self.cfg.data.drop_patient_if_outcome_before_date ) patients_to_drop = set(dataset["dw_ek_borger"][outcome_before_date].unique()) dataset = dataset[~dataset["dw_ek_borger"].isin(patients_to_drop)] - # Removed dates before drop_patient_if_outcome_before_date - dataset = dataset[ - dataset[self.spec.pred_time_colname] - > self.spec.time.drop_patient_if_outcome_before_date - ] - n_rows_after_modification = dataset.shape[0] - percent_dropped = ( - n_rows_before_modification - n_rows_after_modification - ) / n_rows_before_modification - msg.info( - f"Dropped {n_rows_before_modification - n_rows_after_modification} ({percent_dropped}%) rows because patients had diabetes in the washin period.", + percent_dropped = get_percent_lost( + n_before=n_rows_after_modification, + n_after=n_rows_after_modification, ) + if n_rows_before_modification - n_rows_after_modification != 0: + msg.info( + f"Dropped {n_rows_before_modification - n_rows_after_modification} ({percent_dropped}%) rows because patients had diabetes in the washin period.", + ) + return dataset def _drop_cols_not_in_lookbehind_combination( @@ -240,33 +204,49 @@ def _drop_cols_not_in_lookbehind_combination( pd.DataFrame: Dataset with dropped columns. """ + if not self.cfg.data.lookbehind_combination: + raise ValueError("No lookbehind_combination provided.") + # Extract all unique lookbhehinds in the dataset predictors lookbehinds_in_dataset = { - int(re.findall(r"within_(\d+)_days", col)[0]) - for col in dataset.columns - if self.pred_col_name_prefix in col + int(infer_look_distance(col)[0]) + for col in infer_predictor_col_name(df=dataset) + if len(infer_look_distance(col)) > 0 } + # Convert list to set + lookbehinds_in_spec = set(self.cfg.data.lookbehind_combination) + # Check that all loobehinds in lookbehind_combination are used in the predictors - if not set(self.spec.time.lookbehind_combination).issubset( + if not lookbehinds_in_spec.issubset( lookbehinds_in_dataset, ): - raise ValueError( - f"One or more of the provided lookbehinds in lookbehind_combination is/are not used in any predictors in the dataset. Lookbehinds in dataset: {lookbehinds_in_dataset}. Lookbehinds in lookbehind_combination: {self.spec.time.lookbehind_combination}.", + msg.warn( + f"One or more of the provided lookbehinds in lookbehind_combination is/are not used in any predictors in the dataset: {lookbehinds_in_spec - lookbehinds_in_dataset}", ) + lookbehinds_to_keep = lookbehinds_in_spec.intersection( + lookbehinds_in_dataset, + ) + + if not lookbehinds_to_keep: + raise ValueError("No predictors left after dropping lookbehinds.") + + msg.warn(f"Training on {lookbehinds_to_keep}.") + else: + lookbehinds_to_keep = lookbehinds_in_spec + # Create a list of all predictor columns who have a lookbehind window not in lookbehind_combination list cols_to_drop = [ - col - for col in dataset.columns - if any( - str(x) not in col and self.pred_col_name_prefix in col - for x in self.spec.time.lookbehind_combination - ) + c + for c in infer_predictor_col_name(df=dataset) + if all(str(l_beh) not in c for l_beh in lookbehinds_to_keep) ] - dataset = dataset.drop(columns=cols_to_drop) + cols_to_drop = [c for c in cols_to_drop if "within" in c] + # ? Add some specification of within_x_days indicating how to parse columns to find lookbehinds. Or, alternatively, use the column spec. + dataset = dataset.drop(columns=cols_to_drop) return dataset def _convert_timestamp_dtype_and_nat(self, dataset: pd.DataFrame) -> pd.DataFrame: @@ -311,9 +291,7 @@ def _drop_cols_if_exceeds_look_direction_threshold( n_cols_before_modification = dataset.shape[1] if direction == "behind": - cols_to_process = [ - c for c in dataset.columns if self.pred_col_name_prefix in c - ] + cols_to_process = infer_predictor_col_name(df=dataset) for col in cols_to_process: # Extract lookbehind days from column name use regex @@ -324,20 +302,23 @@ def _drop_cols_if_exceeds_look_direction_threshold( if len(lookbehind_days_strs) > 0: lookbehind_days = int(lookbehind_days_strs[0]) else: - raise ValueError(f"Could not extract lookbehind days from {col}") + msg.warn(f"Could not extract lookbehind days from {col}") + continue if lookbehind_days > look_direction_threshold: cols_to_drop.append(col) n_cols_after_modification = dataset.shape[1] - percent_dropped = ( - n_cols_before_modification - n_cols_after_modification - ) / n_cols_before_modification - - msg.info( - f"Dropped {n_cols_before_modification - n_cols_after_modification} ({percent_dropped}%) columns because they were looking {direction} further out than {look_direction_threshold} days.", + percent_dropped = get_percent_lost( + n_before=n_cols_before_modification, + n_after=n_cols_after_modification, ) + if n_cols_before_modification - n_cols_after_modification != 0: + msg.info( + f"Dropped {n_cols_before_modification - n_cols_after_modification} ({percent_dropped}%) columns because they were looking {direction} further out than {look_direction_threshold} days.", + ) + return dataset[[c for c in dataset.columns if c not in cols_to_drop]] def _drop_cols_and_rows_if_look_direction_not_met( @@ -357,10 +338,10 @@ def _drop_cols_and_rows_if_look_direction_not_met( for direction in ("ahead", "behind"): if direction in ("ahead", "behind"): - if self.spec.time.min_lookahead_days: - n_days = self.spec.time.min_lookahead_days - elif self.spec.time.min_lookbehind_days: - n_days = self.spec.time.min_lookbehind_days + if direction == "ahead": + n_days = self.cfg.data.min_lookahead_days + elif direction == "behind": + n_days = self.cfg.data.min_lookbehind_days else: continue @@ -378,6 +359,35 @@ def _drop_cols_and_rows_if_look_direction_not_met( return dataset + def _keep_unique_outcome_col_with_lookahead_days_matching_conf( + self, + dataset: pd.DataFrame, + ) -> pd.DataFrame: + """Keep only one outcome column with the same lookahead days as set in + the config.""" + outcome_cols = infer_outcome_col_name(df=dataset, allow_multiple=True) + + col_to_drop = [ + c for c in outcome_cols if str(self.cfg.data.min_lookahead_days) not in c + ] + + # If no columns to drop, return the dataset + if not col_to_drop: + return dataset + + df = dataset.drop(col_to_drop, axis=1) + + if not len(infer_outcome_col_name(df)) == 1: + raise ValueError( + "Returning more than one outcome column, will cause problems during eval.", + ) + + return df + + def n_outcome_col_names(self, df: pd.DataFrame): + """How many outcome columns there are in a dataframe.""" + return len(infer_outcome_col_name(df=df, allow_multiple=True)) + def _process_dataset(self, dataset: pd.DataFrame) -> pd.DataFrame: """Process dataset, namely: @@ -389,25 +399,27 @@ def _process_dataset(self, dataset: pd.DataFrame) -> pd.DataFrame: Returns: pd.DataFrame: Processed dataset """ - if self.spec.time.drop_patient_if_outcome_before_date: - dataset = add_washin_timestamps(dataset=dataset) - dataset = self._convert_timestamp_dtype_and_nat(dataset) - if self.spec.time.drop_patient_if_outcome_before_date: - dataset = self._drop_patients_with_event_in_washin(dataset=dataset) + + if self.cfg.data.drop_patient_if_outcome_before_date: + dataset = self.drop_patient_if_outcome_before_date(dataset=dataset) # Drop if later than min prediction time date - if self.spec.time.min_prediction_time_date: + if self.cfg.data.min_prediction_time_date: dataset = dataset[ - dataset[self.spec.pred_time_colname] - > self.spec.time.min_prediction_time_date + dataset[self.cfg.data.pred_timestamp_col_name] + > self.cfg.data.min_prediction_time_date ] dataset = self._drop_cols_and_rows_if_look_direction_not_met(dataset=dataset) - if self.spec.time.lookbehind_combination: + if self.cfg.data.lookbehind_combination: dataset = self._drop_cols_not_in_lookbehind_combination(dataset=dataset) + dataset = self._keep_unique_outcome_col_with_lookahead_days_matching_conf( + dataset=dataset, + ) + return dataset def load_dataset_from_dir( @@ -425,22 +437,7 @@ def load_dataset_from_dir( Returns: pd.DataFrame: The filtered dataset """ - # Handle input types - for timedelta_arg in ( - self.spec.time.min_lookbehind_days, - self.spec.time.min_lookahead_days, - ): - if timedelta_arg: - timedelta_arg = timedelta(days=timedelta_arg) # type: ignore - - for date_arg in ( - self.spec.time.drop_patient_if_outcome_before_date, - self.spec.time.min_prediction_time_date, - ): - if isinstance(date_arg, str): - date_arg = coerce_to_datetime( - date_repr=date_arg, - ) + msg.info(f"Loading {split_names}") # Concat splits if multiple are given if isinstance(split_names, (list, tuple)): @@ -468,42 +465,6 @@ def load_dataset_from_dir( return dataset -def _init_spec_from_cfg( - cfg: DictConfig, -) -> DatasetSpecification: - """Initialise a feature spec from a DictConfig.""" - data_cfg: dict[str, Any] = OmegaConf.to_container( # type: ignore - cfg.data, - resolve=True, - ) - - if data_cfg["source"] == "synthetic": - split_dir_path = PROJECT_ROOT / "tests" / "test_data" / "synth_splits" - file_suffix = "csv" - else: - split_dir_path = data_cfg["dir"] - file_suffix = data_cfg["source"] - - time_spec = DatasetTimeSpecification( - drop_patient_if_outcome_before_date=data_cfg[ - "drop_patient_if_outcome_before_date" - ], - min_lookahead_days=data_cfg["min_lookahead_days"], - min_lookbehind_days=data_cfg["min_lookbehind_days"], - min_prediction_time_date=data_cfg["min_prediction_time_date"], - lookbehind_combination=data_cfg["lookbehind_combination"], - ) - - return DatasetSpecification( - split_dir_path=split_dir_path, - pred_col_name_prefix=data_cfg["pred_col_name_prefix"], - file_suffix=file_suffix, - pred_time_colname=data_cfg["pred_timestamp_col_name"], - n_training_samples=data_cfg["n_training_samples"], - time=time_spec, - ) - - class SplitDataset(BaseModel): """A dataset split into train, test and optionally validation.""" @@ -517,16 +478,29 @@ class Config: val: pd.DataFrame -def load_train_and_val_from_cfg(cfg: DictConfig): - """Load train and validation data from file.""" +def load_train_from_cfg(cfg: FullConfig) -> pd.DataFrame: + """Load train dataset from config. - data_specification = _init_spec_from_cfg( - cfg, - ) + Args: + cfg (FullConfig): Config + + Returns: + pd.DataFrame: Train dataset + """ + return DataLoader(cfg=cfg).load_dataset_from_dir(split_names="train") - split = DataLoader(spec=data_specification) + +def load_train_and_val_from_cfg(cfg: FullConfig): + """Load train and validation data from file.""" + + loader = DataLoader(cfg=cfg) return SplitDataset( - train=split.load_dataset_from_dir(split_names="train"), - val=split.load_dataset_from_dir(split_names="val"), + train=loader.load_dataset_from_dir(split_names="train"), + val=loader.load_dataset_from_dir(split_names="val"), ) + + +def get_latest_dataset_dir(path: Path) -> Path: + """Get the latest dataset directory by time of creation.""" + return max(path.glob("*"), key=os.path.getctime) diff --git a/src/psycopt2d/model_performance/model_performance.py b/src/psycopt2d/model_performance/model_performance.py index e3f887da..805ff62b 100644 --- a/src/psycopt2d/model_performance/model_performance.py +++ b/src/psycopt2d/model_performance/model_performance.py @@ -28,16 +28,17 @@ class ModelPerformance: """Evaluators of model performance.""" + @staticmethod def performance_metrics_from_df( prediction_df: pd.DataFrame, prediction_col_name: str, label_col_name: str, - id_col_name: Optional[str] = None, + id_col_name: str = None, metadata_col_names: Optional[list[str]] = None, id2label: Optional[ # pylint: disable=redefined-outer-name dict[int, str] ] = None, - to_wide: Optional[bool] = False, + to_wide: bool = False, binary_threshold: Optional[float] = 0.5, ) -> pd.DataFrame: """Calculate performance metrics from a dataframe. @@ -49,7 +50,7 @@ def performance_metrics_from_df( prediction_df (pd.DataFrame): Dataframe with 1 row per prediction. prediction_col_name (str): column containing probabilities for each class or a list of floats for binary classification. label_col_name (str): column containing ground truth label - id_col_name (str, optional): Column name for the id, used for grouping. + id_col_name (str): Column name for the id, used for grouping. metadata_col_names (Optional[list[str]], optional): Column(s) containing metadata to add to the performance dataframe. Each column should only contain 1 unique value. E.g. model_name, modality.. If set to "all" will auto-detect metadata columns and add them all. @@ -61,8 +62,6 @@ def performance_metrics_from_df( pd.Dataframe: Dataframe with performance metrics. """ - concat_axis = 1 if to_wide else 0 - performance_description = ModelPerformance._evaluate_single_model( prediction_df=prediction_df, aggregate_by_id=False, @@ -93,6 +92,8 @@ def performance_metrics_from_df( binary_threshold=binary_threshold, ) + concat_axis = 1 if to_wide else 0 + performance_description = pd.concat( [performance_description, performance_by_id], axis=concat_axis, @@ -113,6 +114,7 @@ def performance_metrics_from_df( return performance_description + @staticmethod def performance_metrics_from_file( jsonl_path: Union[str, Path], prediction_col_name: str, @@ -122,7 +124,7 @@ def performance_metrics_from_file( id2label: Optional[ # pylint: disable=redefined-outer-name dict[int, str] ] = None, - to_wide: Optional[bool] = False, + to_wide: bool = False, binary_threshold: Optional[float] = 0.5, ) -> pd.DataFrame: """Load a .jsonl file and returns performance metrics. @@ -214,6 +216,7 @@ def performance_metrics_from_folder( ] return pd.concat(dfs) + @staticmethod def _evaluate_single_model( # pylint: disable=too-many-locals prediction_df: pd.DataFrame, aggregate_by_id: bool, @@ -365,9 +368,7 @@ def compute_metrics( """ # sorting to get correct output from f1, prec, and recall groups = sorted(set(labels)) - performance = {} - - performance["acc-overall"] = accuracy_score(labels, predicted) + performance = {"acc-overall": accuracy_score(labels, predicted)} performance["balanced_accuracy-overall"] = balanced_accuracy_score( labels, predicted, diff --git a/src/psycopt2d/model_training_watcher.py b/src/psycopt2d/model_training_watcher.py index 5ac58127..0de379af 100644 --- a/src/psycopt2d/model_training_watcher.py +++ b/src/psycopt2d/model_training_watcher.py @@ -2,17 +2,20 @@ import argparse import subprocess import time +from collections import defaultdict from distutils.util import strtobool # pylint: disable=deprecated-module from pathlib import Path -from typing import Optional +from typing import Any, Optional, Union import wandb +from pydantic import BaseModel from wandb.apis.public import Api # pylint: disable=no-name-in-module from wandb.sdk.wandb_run import Run # pylint: disable=no-name-in-module from wasabi import msg +from psycopt2d.configs import ModelEvalData from psycopt2d.evaluation import evaluate_model -from psycopt2d.utils import ( +from psycopt2d.utils.utils import ( MODEL_PREDICTIONS_PATH, PROJECT_ROOT, infer_outcome_col_name, @@ -20,11 +23,34 @@ load_evaluation_data, ) -# Path to the wandb directory WANDB_DIR = PROJECT_ROOT / "wandb" -class ModelTrainingWatcher: +class RunInformation(BaseModel): + """Information about a wandb run.""" + + # Attributes must be optional since runs can be uploaded, + # without having been sufficiently validated. + run_id: str + auc: Optional[float] + lookbehind_days: Optional[Union[int, list[int]]] + lookahead_days: Optional[int] + + # Concatenated lookbehind_days and lookahead_days string for + # max_performances scoreboard. Allows to only fully evaluate + # the models that set new high scores. + lookahead_lookbehind_combined: Optional[str] = None + + def __init__(self, **kwargs): + super().__init__(**kwargs) + if ( + self.lookahead_lookbehind_combined is None + and self.lookbehind_days is not None + ): + self.lookahead_lookbehind_combined = f"lookahead:{str(self.lookahead_days)}_lookbehind:{str(self.lookbehind_days)}" + + +class ModelTrainingWatcher: # pylint: disable=too-many-instance-attributes """Watch the wandb directory for new files and uploads them to wandb. Fully evaluates the best runs after a certain number of runs have been uploaded. @@ -36,6 +62,7 @@ class ModelTrainingWatcher: model_data_dir: Where to look for evaluation results. overtaci: Whether the script is running on overtaci. Determines where to look for the evaluation results. + verbose: Whether to print verbose output. """ def __init__( @@ -44,6 +71,7 @@ def __init__( project_name: str, n_runs_before_eval: int, model_data_dir: Path, + verbose: bool = False, ): self.entity = entity self.project_name = project_name @@ -51,12 +79,14 @@ def __init__( self.n_runs_before_eval = n_runs_before_eval + self.verbose = verbose # A queue for runs waiting to be uploaded to WandB - self.run_id_upload_queue = [] - self.max_performance = 0 + self.run_id_eval_candidates_queue: list[str] = [] + # max performance by lookbehind/-ahead combination + self.max_performances: dict[str, float] = defaultdict(lambda: 0.0) self.archive_path = WANDB_DIR / "archive" - self.archive_path.mkdir(exist_ok=True) + self.archive_path.mkdir(exist_ok=True, parents=True) def watch(self, timeout_minutes: Optional[int] = None) -> None: """Watch the wandb directory for new runs. @@ -70,45 +100,38 @@ def watch(self, timeout_minutes: Optional[int] = None) -> None: timeout_minutes is None or start_time + timeout_minutes * 60 > time.time() ): self.get_new_runs_and_evaluate() - time.sleep(10) - - def get_new_runs_and_evaluate(self) -> None: - """Get new runs and evaluate the best runs.""" - self.upload_unarchived_runs() - if len(self.run_id_upload_queue) >= self.n_runs_before_eval: - self.evaluate_best_runs() - - def _upload_run_dir(self, run_dir: Path) -> None: - """Upload a single run to wandb.""" - subprocess.run( - ["wandb", "sync", str(run_dir), "--project", self.project_name], - check=True, - ) + time.sleep(1) def _archive_run_dir(self, run_dir: Path) -> None: """Move a run to the archive folder.""" - run_dir.rename(self.archive_path / run_dir.name) + run_dir.rename(target=self.archive_path / run_dir.name) def _get_run_id(self, run_dir: Path) -> str: """Get the run id from a run directory.""" return run_dir.name.split("-")[-1] - def upload_unarchived_runs(self) -> None: - """Upload unarchived runs to wandb.""" - for run_folder in WANDB_DIR.glob(r"offline-run*"): - run_id = self._get_run_id(run_folder) + def _upload_run_dir(self, run_dir: Path) -> str: + """Upload a single run to wandb.""" + # get stdout from subprocess.run + proc = subprocess.run( + ["wandb", "sync", str(run_dir), "--project", self.project_name], + check=True, + capture_output=True, + ) - self._upload_run_dir(run_folder) - self._archive_run_dir(run_folder) - self.run_id_upload_queue.append(run_id) + stdout = proc.stdout.decode("utf-8") + + if self.verbose: + msg.info(f"Watcher: {stdout}") + return stdout - def _get_run_evaluation_dir(self, run_id: str) -> Path: + def _get_run_evaluation_data_dir(self, run_id: str) -> Path: """Get the evaluation path for a single run.""" return list(self.model_data_dir.glob(f"*{run_id}*"))[0] - def _get_eval_data(self, run_id: str) -> tuple: + def _get_eval_data(self, run_id: str) -> ModelEvalData: """Get the evaluation data for a single run.""" - run_eval_dir = self._get_run_evaluation_dir(run_id) + run_eval_dir = self._get_run_evaluation_data_dir(run_id) return load_evaluation_data(run_eval_dir) @@ -117,10 +140,10 @@ def _do_evaluation(self, run_id: str) -> None: # get evaluation data eval_data = self._get_eval_data(run_id) # infer required column names - y_col_name = infer_outcome_col_name(df=eval_data.df, prefix="outc_") - y_hat_prob_col_name = infer_y_hat_prob_col_name(df=eval_data.df) + y_col_name = infer_outcome_col_name(df=eval_data.df, prefix="outc_")[0] + y_hat_prob_col_name = infer_y_hat_prob_col_name(df=eval_data.df)[0] # get wandb run - run = wandb.init(project=self.project_name, entity=self.entity, id=run_id) + run: Run = wandb.init(project=self.project_name, entity=self.entity, id=run_id) # type: ignore # run evaluation evaluate_model( @@ -137,38 +160,118 @@ def _get_wandb_run(self, run_id: str) -> Run: """Get the wandb run object from the run id.""" return Api().run(f"{self.entity}/{self.project_name}/{run_id}") - def _get_run_performance(self, run_id: str) -> float: - """Get the performance of a single run and check if it failed.""" - run = self._get_wandb_run(run_id) - if "roc_auc_unweighted" in run.summary: - return run.summary.roc_auc_unweighted - msg.info( - f"Run {run_id} has no performance metric. Pinging again at next eval time.", - ) + def _get_run_wandb_dir(self, run_id: str) -> Path: + return list(WANDB_DIR.glob(f"*offline-run*{run_id}*"))[0] + + def _get_run_attribute(self, run: Run, attribute: str) -> Any: + """Get an attribute from a wandb run.""" + if attribute in run.summary: + return run.summary[attribute] + if self.verbose: + msg.info( + f"Run {run.id} has no attribute {attribute}. Pinging again at next eval time.", + ) return None - def evaluate_best_runs(self) -> None: - """Evaluate the best runs.""" - run_performances = { - run_id: self._get_run_performance(run_id) - for run_id in self.run_id_upload_queue - } - # sort runs by performance to not upload subpar runs - run_performances = dict( - sorted(run_performances.items(), key=lambda item: item[1], reverse=True), + def _evaluate_and_archive_finished_runs( + self, + run_information: list[RunInformation], + ) -> None: + """Evaluate the finished runs. + + Test their performance against the current maximum for each + lookbehind/-ahead days, and fully evaluate the best performing. + Move all wandb run dirs to the archive folder. + """ + finished_runs: list[RunInformation] = [ + run_info + for run_info in run_information + if run_info.auc and run_info.lookahead_lookbehind_combined + ] + # sort to only upload the best in in each group + finished_runs.sort( + key=lambda run_info: ( + run_info.lookahead_lookbehind_combined, + run_info.auc, + ), + reverse=True, ) - # get runs with auc of None (attempted upload before run finished) - unfinished_runs = [ - run_id for run_id, auc in run_performances.items() if auc is None + + if finished_runs: + for run_info in finished_runs: + if ( + run_info.auc # type: ignore + > self.max_performances[ + run_info.lookahead_lookbehind_combined # type: ignore + ] + ): + msg.good( + f"New record performance for {run_info.lookahead_lookbehind_combined}! AUC: {run_info.auc}", + ) + self.max_performances[ + run_info.lookahead_lookbehind_combined # type: ignore + ] = run_info.auc # type: ignore + self._do_evaluation(run_info.run_id) + self._archive_run_dir(run_dir=self._get_run_wandb_dir(run_info.run_id)) + + def _get_unfinished_run_ids( + self, + run_information: list[RunInformation], + ) -> list[str]: + """Get the run ids of the unfinished runs.""" + return [run_info.run_id for run_info in run_information if run_info.auc is None] + + def _get_run_information(self, run_id: str) -> RunInformation: + """Get the run information for a single run.""" + run = self._get_wandb_run(run_id) + return RunInformation( + run_id=run_id, + auc=self._get_run_attribute(run, "roc_auc_unweighted"), + lookbehind_days=self._get_run_attribute(run, "lookbehind"), + lookahead_days=self._get_run_attribute(run, "lookahead"), + ) + + def _get_run_information_for_all_in_queue(self): + """Get the performance and information of all runs in the evaluation + queue.""" + return [ + self._get_run_information(run_id) + for run_id in self.run_id_eval_candidates_queue ] - for run_id, performance in run_performances.items(): - if performance > self.max_performance: - msg.good(f"New record performance! AUC: {performance}") - self.max_performance = performance - self._do_evaluation(run_id) - # reset run id queue and try to upload unfinished runs next time - self.run_id_upload_queue = unfinished_runs + def get_new_runs_and_evaluate(self) -> None: + """Get new runs and evaluate the best runs.""" + self.upload_unarchived_runs() + + if len(self.run_id_eval_candidates_queue) >= self.n_runs_before_eval: + run_infos = self._get_run_information_for_all_in_queue() + self.run_id_eval_candidates_queue = self._get_unfinished_run_ids( + run_information=run_infos, + ) + self._evaluate_and_archive_finished_runs(run_information=run_infos) + + def upload_unarchived_runs(self) -> None: + """Upload unarchived runs to wandb. Only adds runs that have finished + training to the evaluation queue. + + Raises: + ValueError: If wandb sync failed + """ + for run_folder in WANDB_DIR.glob(r"offline-run*"): + run_id = self._get_run_id(run_folder) + + wandb_sync_stdout = self._upload_run_dir(run_folder) + + if "... done" not in wandb_sync_stdout: + if ".wandb file is empty" not in wandb_sync_stdout: + raise ValueError( + f"wandb sync failed, returned: {wandb_sync_stdout}", + ) + if self.verbose: + msg.warn(f"Run {run_id} is still running. Skipping.") + continue + + self.run_id_eval_candidates_queue.append(run_id) def archive_all_runs(self) -> None: """Archive all runs in the wandb directory.""" @@ -215,6 +318,13 @@ def float_or_none(arg: str) -> Optional[float]: help="Archive all runs in the wandb dir before starting", required=True, ) + parser.add_argument( + "--verbose", + type=lambda x: bool(strtobool(x)), + help="Whether to print verbose messages (default: False)", + required=False, + default=False, + ) args = parser.parse_args() model_data_dir = ( @@ -228,9 +338,11 @@ def float_or_none(arg: str) -> Optional[float]: project_name=args.project_name, n_runs_before_eval=args.n_runs_before_eval, model_data_dir=model_data_dir, + verbose=args.verbose, ) + if args.clean_wandb_dir: watcher.archive_all_runs() - msg.info("Starting WandB watcher") + msg.info("Watcher: Starting WandB watcher") watcher.watch(timeout_minutes=args.timeout) diff --git a/src/psycopt2d/tables/performance_by_threshold.py b/src/psycopt2d/tables/performance_by_threshold.py index 688faaa3..f2cb7a25 100644 --- a/src/psycopt2d/tables/performance_by_threshold.py +++ b/src/psycopt2d/tables/performance_by_threshold.py @@ -1,5 +1,5 @@ """Get performance by which threshold is used to classify positive.""" -from collections.abc import Iterable +from collections.abc import Iterable, Sequence from typing import Optional, Union import numpy as np @@ -9,8 +9,8 @@ def performance_by_threshold( # pylint: disable=too-many-locals - labels: Iterable[int], - pred_probs: Iterable[float], + labels: Sequence[int], + pred_probs: Sequence[float], positive_threshold: float, round_to: int = 4, ) -> pd.DataFrame: @@ -26,7 +26,7 @@ def performance_by_threshold( # pylint: disable=too-many-locals Returns: pd.DataFrame """ - preds = np.where(pred_probs > positive_threshold, 1, 0) + preds = np.where(pred_probs > positive_threshold, 1, 0) # type: ignore conf_matrix = confusion_matrix(labels, preds) @@ -141,33 +141,31 @@ def days_from_first_positive_to_diagnosis( ] ] - warning_days = df["warning_days"].agg(aggregation_method) - - return warning_days + return df["warning_days"].agg(aggregation_method) def generate_performance_by_positive_rate_table( - labels: Iterable[int], - pred_probs: Iterable[float], - positive_rate_thresholds: Iterable[Union[int, float]], - pred_proba_thresholds: Iterable[float], - ids: Iterable[Union[int, float]], - pred_timestamps: Iterable[pd.Timestamp], - outcome_timestamps: Iterable[pd.Timestamp], + labels: Sequence[int], + pred_probs: Sequence[float], + positive_rate_thresholds: Sequence[Union[int, float]], + pred_proba_thresholds: Sequence[float], + ids: Sequence[Union[int, float]], + pred_timestamps: Sequence[pd.Timestamp], + outcome_timestamps: Sequence[pd.Timestamp], output_format: Optional[str] = "wandb_table", ) -> Union[pd.DataFrame, str]: """Generates a performance_by_threshold table as either a DataFrame or html object. Args: - labels (Iterable[int]): True labels. - pred_probs (Iterable[float]): Predicted probabilities. - positive_rate_thresholds (Iterable[float]): Positive_rate_thresholds to add to the table, e.g. 0.99, 0.98 etc. + labels (Sequence[int]): True labels. + pred_probs (Sequence[float]): Predicted probabilities. + positive_rate_thresholds (Sequence[float]): Positive_rate_thresholds to add to the table, e.g. 0.99, 0.98 etc. Calculated so that the Xth percentile of predictions are classified as the positive class. - pred_proba_thresholds (Iterable[float]): Thresholds above which predictions are classified as positive. - ids (Iterable[Union[int, float]]): Ids to group on. - pred_timestamps (Iterable[ pd.Timestamp ]): Timestamp for each prediction time. - outcome_timestamps (Iterable[pd.Timestamp]): Timestamp for each outcome time. + pred_proba_thresholds (Sequence[float]): Thresholds above which predictions are classified as positive. + ids (Sequence[Union[int, float]]): Ids to group on. + pred_timestamps (Sequence[ pd.Timestamp ]): Timestamp for each prediction time. + outcome_timestamps (Sequence[pd.Timestamp]): Timestamp for each outcome time. output_format (str, optional): Format to output - either "df" or "wandb_table". Defaults to "df". Returns: diff --git a/src/psycopt2d/train_and_log_models.py b/src/psycopt2d/train_and_log_models.py index 499c539a..3608733f 100644 --- a/src/psycopt2d/train_and_log_models.py +++ b/src/psycopt2d/train_and_log_models.py @@ -7,6 +7,7 @@ - Replace the HYDRA_ARGS string with the desired arguments for `train_model.py` - Run this script from project root with `python src/psycopt2d/train_and_log_models.py` """ + import subprocess import time @@ -16,7 +17,7 @@ # RUN CONSTANTS CONFIG_NAME = "integration_testing.yaml" -HYDRA_ARGS = f"--multirun +model=xgboost project.wandb_mode='dryrun' model.args.tree_method='auto' --config-name {CONFIG_NAME}" +HYDRA_ARGS = f"--multirun project.wandb.mode='dryrun' model.args.tree_method='auto' --config-name {CONFIG_NAME}" OVERTACI = "false" # Change to "true" if running on overtaci # WATCHER CONSTANTS @@ -70,15 +71,3 @@ time.sleep(60 * KEEP_WATCHER_ALIVE_AFTER_TRAINING_FINISHED_MINUTES) watcher.kill() msg.good("Watcher stopped.") - - # any_process_done = False # pylint: disable=invalid-name - # for process in (trainer, watcher): - # while process.poll() is None: - # if any_process_done: - # # kill the watcher if the trainer is done - # # but allow some time to finish evaluation - # time.sleep(KEEP_WATCHER_ALIVE_AFTER_TRAINING_FINISHED_MINUTES * 60) - # process.kill() - # time.sleep(1) - # any_process_done = True # pylint: disable=invalid-name - # process.kill() diff --git a/src/psycopt2d/train_model.py b/src/psycopt2d/train_model.py index 4bd41b13..90b2ce98 100644 --- a/src/psycopt2d/train_model.py +++ b/src/psycopt2d/train_model.py @@ -1,14 +1,13 @@ """Training script for training a single model for predicting t2d.""" import os from collections.abc import Iterable -from datetime import datetime -from pathlib import Path -from typing import Optional +from typing import Any, Optional import hydra import numpy as np import pandas as pd import wandb +from omegaconf import OmegaConf from omegaconf.dictconfig import DictConfig from sklearn.feature_selection import SelectPercentile, chi2, f_classif from sklearn.impute import SimpleImputer @@ -25,15 +24,16 @@ ConvertToBoolean, DateTimeConverter, ) -from psycopt2d.utils import ( +from psycopt2d.utils.configs import FullConfig, omegaconf_to_pydantic_objects +from psycopt2d.utils.utils import ( + PROJECT_ROOT, create_wandb_folders, flatten_nested_dict, get_feature_importance_dict, prediction_df_with_metadata_to_disk, ) -CONFIG_PATH = Path(__file__).parent / "config" -TRAINING_COL_NAME_PREFIX = "pred_" +CONFIG_PATH = PROJECT_ROOT / "src" / "psycopt2d" / "config" # Handle wandb not playing nice with joblib os.environ["WANDB_START_METHOD"] = "thread" @@ -64,23 +64,23 @@ def create_preprocessing_pipeline(cfg): ("z-score-normalization", StandardScaler()), ) - if cfg.preprocessing.feature_selection_method == "f_classif": + if cfg.preprocessing.feature_selection.name == "f_classif": steps.append( ( "feature_selection", SelectPercentile( f_classif, - percentile=cfg.preprocessing.feature_selection_params.percentile, + percentile=cfg.preprocessing.feature_selection.params["percentile"], ), ), ) - if cfg.preprocessing.feature_selection_method == "chi2": + if cfg.preprocessing.feature_selection.name == "chi2": steps.append( ( "feature_selection", SelectPercentile( chi2, - percentile=cfg.preprocessing.feature_selection_params.percentile, + percentile=cfg.preprocessing.feature_selection.params["percentile"], ), ), ) @@ -311,7 +311,7 @@ def get_col_names(cfg: DictConfig, train: pd.DataFrame) -> tuple[str, list[str]] """ outcome_col_name = ( # pylint: disable=invalid-name - f"outc_dichotomous_t2d_within_{cfg.data.lookahead_days}_days_max_fallback_0" + f"outc_dichotomous_t2d_within_{cfg.data.min_lookahead_days}_days_max_fallback_0" ) train_col_names = [ # pylint: disable=invalid-name @@ -326,24 +326,37 @@ def get_col_names(cfg: DictConfig, train: pd.DataFrame) -> tuple[str, list[str]] config_name="default_config", version_base="1.2", ) -def main(cfg): +def main(cfg: DictConfig): """Main function for training a single model.""" + # Save dictconfig for easier logging + if isinstance(cfg, DictConfig): + # Create flattened dict for logging to wandb + # Wandb doesn't allow configs to be nested, so we + # flatten it. + dict_config_to_log: dict[str, Any] = flatten_nested_dict(OmegaConf.to_container(cfg), sep=".") # type: ignore + else: + # For testing, we can take a FullConfig object instead. Simplifies boilerplate. + dict_config_to_log = cfg.__dict__ + + if not isinstance(cfg, FullConfig): + cfg = omegaconf_to_pydantic_objects(cfg) + msg = Printer(timestamp=True) create_wandb_folders() - # Get today's date as str - today_str = datetime.now().strftime("%Y-%m-%d") - run = wandb.init( project=cfg.project.name, reinit=True, - config=flatten_nested_dict(cfg, sep="."), - mode=cfg.project.wandb_mode, - group=today_str, - entity=cfg.project.wandb_entity, + config=dict_config_to_log, + mode=cfg.project.wandb.mode, + group=cfg.project.wandb.group, + entity=cfg.project.wandb.entity, ) + if run is None: + raise ValueError("Failed to initialise Wandb") + dataset = load_train_and_val_from_cfg(cfg) msg.info("Creating pipeline") @@ -359,7 +372,7 @@ def main(cfg): pipe=pipe, outcome_col_name=outcome_col_name, train_col_names=train_col_names, - n_splits=cfg.training.n_splits, + n_splits=cfg.train.n_splits, ) # Save model predictions, feature importance, and config to disk @@ -367,7 +380,7 @@ def main(cfg): # only run full evaluation if wandb mode mode is online # otherwise delegate to watcher script - if cfg.project.wandb_mode == "run": + if cfg.project.wandb.mode == "run": msg.info("Evaluating model") evaluate_model( cfg=cfg, @@ -376,6 +389,8 @@ def main(cfg): y_hat_prob_col_name="y_hat_prob", feature_importance_dict=get_feature_importance_dict(pipe), run=run, + pipe=pipe, + train_col_names=train_col_names, ) roc_auc = roc_auc_score( @@ -384,7 +399,13 @@ def main(cfg): ) msg.info(f"ROC AUC: {roc_auc}") - run.log({"roc_auc_unweighted": roc_auc}) + run.log( + { + "roc_auc_unweighted": roc_auc, + "lookbehind": max(cfg.data.lookbehind_combination), + "lookahead": cfg.data.min_lookahead_days, + }, + ) run.finish() return roc_auc diff --git a/src/psycopt2d/utils/__init__.py b/src/psycopt2d/utils/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/src/psycopt2d/utils/configs.py b/src/psycopt2d/utils/configs.py new file mode 100644 index 00000000..1b43ef45 --- /dev/null +++ b/src/psycopt2d/utils/configs.py @@ -0,0 +1,152 @@ +"""Utilities for converting config yamls to pydantic objects. + +Helpful because it makes them: +- Addressable with intellisense, +- Refactorable with IDEs, +- Easier to document with docstrings and +- Type checkable +""" + +from datetime import datetime +from pathlib import Path +from typing import Optional, Union + +from omegaconf import DictConfig, OmegaConf +from pydantic import BaseModel as PydanticBaseModel + + +class BaseModel(PydanticBaseModel): + """Allow arbitrary types in all pydantic models.""" + + class Config: + """Allow arbitrary types.""" + + arbitrary_types_allowed = True + allow_mutation = False + + +class WandbConf(BaseModel): + """Configuration for weights and biases.""" + + group: str + mode: str + entity: str + + +class WatcherConf(BaseModel): + """Configuration for watchers.""" + + archive_all: bool + keep_alive_after_training_minutes: Union[int, float] + n_runs_before_eval: int + verbose: bool + + +class ProjectConf(BaseModel): + """Project configuration.""" + + wandb: WandbConf + name: str = "psycopt2d" + seed: int + watcher: WatcherConf + + +class DataConf(BaseModel): + """Data configuration.""" + + n_training_samples: Optional[ + int + ] # (int, null): Number of training samples to use, defaults to null in which cases it uses all samples. + dir: Union[Path, str] + suffix: str # File suffix to load. + + # Feature specs + pred_col_name_prefix: str # prefix of predictor columns + pred_timestamp_col_name: str # (str): Column name for prediction times + outcome_timestamp_col_name: str # (str): Column name for outcome timestamps + id_col_name: str # (str): Citizen colnames + + # Looking ahead + min_lookahead_days: int # (int): Drop all prediction times where (max timestamp in the dataset) - (current timestamp) is less than min_lookahead_days + drop_patient_if_outcome_before_date: Optional[Union[str, datetime]] + + # Looking behind + # (int): Drop all prediction times where (prediction_timestamp) - (min timestamp in the dataset) is less than min_lookbehind_days + min_prediction_time_date: Optional[Union[str, datetime]] + min_lookbehind_days: int + max_lookbehind_days: Optional[int] + lookbehind_combination: Optional[list[int]] + + +class FeatureSelectionConf(BaseModel): + """Configuration for feature selection methods.""" + + name: Optional[str] + params: Optional[dict] + + +class PreprocessingConf(BaseModel): + """Preprocessing config.""" + + convert_to_boolean: bool # (Boolean): Convert all prediction values (except gender) to boolean. Defaults to False + convert_datetimes_to: bool # (str): Options include ordinal or False + imputation_method: Optional[str] # (str): Options include "most_frequent" + transform: Optional[ + str + ] # (str|null): Transformation applied to all predictors after imputation. Options include "z-score-normalization" + feature_selection: FeatureSelectionConf + + +class ModelConf(BaseModel): + """Model configuration.""" + + model_name: str # (str): Model, can currently take xgboost + require_imputation: bool # (bool): Whether the model requires imputation. (shouldn't this be false?) + args: dict + + +class TrainConf(BaseModel): + """Training configuration.""" + + n_splits: int # ? How do we handle whether to use crossvalidation or train/val splitting? + n_trials_per_lookdirection_combination: int + n_active_trainers: int # Number of subprocesses to spawn when training + gpu: bool + + +class EvalConf(BaseModel): + """Evaluation config.""" + + threshold_percentiles: list[int] + + # top n features to plot. A table with all features is also logged + top_n_feature_importances: int + + positive_rate_thresholds: list[int] + save_model_predictions_on_overtaci: bool + date_bins_ahead: list[int] + date_bins_behind: list[int] + + +class FullConfig(BaseModel): + """A full configuration object.""" + + project: ProjectConf + data: DataConf + preprocessing: PreprocessingConf + model: ModelConf + train: TrainConf + eval: EvalConf + + +def omegaconf_to_pydantic_objects(conf: DictConfig) -> FullConfig: + """Converts an omegaconf DictConfig to a pydantic object. + + Args: + conf (DictConfig): Omegaconf DictConfig + + Returns: + FullConfig: Pydantic object + """ + conf = OmegaConf.to_container(conf, resolve=True) # type: ignore + return FullConfig(**conf) diff --git a/src/psycopt2d/utils.py b/src/psycopt2d/utils/utils.py similarity index 84% rename from src/psycopt2d/utils.py rename to src/psycopt2d/utils/utils.py index 0edffe57..8e820a74 100644 --- a/src/psycopt2d/utils.py +++ b/src/psycopt2d/utils/utils.py @@ -13,13 +13,13 @@ import dill as pkl import numpy as np import pandas as pd -from omegaconf.dictconfig import DictConfig from sklearn.pipeline import Pipeline from wandb.sdk.wandb_run import Run # pylint: disable=no-name-in-module from wasabi import msg -from psycopt2d.dataclasses.configs import ModelEvalData +from psycopt2d.configs import ModelEvalData from psycopt2d.model_performance import ModelPerformance +from psycopt2d.utils.configs import FullConfig SHARED_RESOURCES_PATH = Path(r"E:\shared_resources") FEATURE_SETS_PATH = SHARED_RESOURCES_PATH / "feature_sets" @@ -27,7 +27,8 @@ RAW_DATA_VALIDATION_PATH = SHARED_RESOURCES_PATH / "raw_data_validation" FEATURIZERS_PATH = SHARED_RESOURCES_PATH / "featurizers" MODEL_PREDICTIONS_PATH = SHARED_RESOURCES_PATH / "model_predictions" -PROJECT_ROOT = Path(__file__).resolve().parents[2] + +PROJECT_ROOT = Path(__file__).resolve().parents[3] def format_dict_for_printing(d: dict) -> str: @@ -59,7 +60,7 @@ def flatten_nested_dict( d: dict, parent_key: str = "", sep: str = ".", -) -> dict: +) -> dict[str, Any]: """Recursively flatten an infinitely nested dict. E.g. {"level1": {"level2": "level3": {"level4": 5}}}} becomes @@ -80,15 +81,15 @@ def flatten_nested_dict( new_key = parent_key + sep + k if parent_key else k if isinstance(v, MutableMapping): items.extend( - flatten_nested_dict(d=v, parent_key=new_key, sep=sep).items(), + flatten_nested_dict(d=v, parent_key=new_key, sep=sep).items(), # type: ignore ) # typing: ignore else: - items.append((new_key, v)) + items.append((new_key, v)) # type: ignore - return dict(items) + return dict(items) # type: ignore -def drop_records_if_datediff_days_smaller_than( +def drop_records_if_datediff_days_smaller_than( # pylint: disable=inconsistent-return-statements df: pd.DataFrame, t2_col_name: str, t1_col_name: str, @@ -157,7 +158,7 @@ def calculate_performance_metrics( A pandas dataframe with the performance metrics. """ performance_metrics = ModelPerformance.performance_metrics_from_df( - eval_df, + prediction_df=eval_df, prediction_col_name=prediction_probabilities_col_name, label_col_name=outcome_col_name, id_col_name=id_col_name, @@ -245,7 +246,7 @@ def dump_to_pickle(obj: Any, path: str) -> None: pkl.dump(obj, f) -def read_pickle(path: str) -> Any: +def read_pickle(path: Union[str, Path]) -> Any: """Reads a pickled object from a file. Args: @@ -299,7 +300,7 @@ def get_feature_importance_dict(pipe: Pipeline) -> Union[None, dict[str, float]] def prediction_df_with_metadata_to_disk( df: pd.DataFrame, - cfg: DictConfig, + cfg: FullConfig, pipe: Pipeline, run: Optional[Run] = None, ) -> None: @@ -321,7 +322,7 @@ def prediction_df_with_metadata_to_disk( else: run_descriptor = f"{timestamp}_{model_args}"[:100] - if cfg.evaluation.save_model_predictions_on_overtaci: + if cfg.eval.save_model_predictions_on_overtaci: # Save to overtaci dir_path = MODEL_PREDICTIONS_PATH / cfg.project.name / run_descriptor else: @@ -393,19 +394,55 @@ def load_evaluation_data(model_data_dir: Path) -> ModelEvalData: ) -def infer_outcome_col_name(df: pd.DataFrame, prefix: str = "outc_") -> str: - """Infer the outcome column name from the dataframe.""" - outcome_name = [c for c in df.columns if c.startswith(prefix)] - if len(outcome_name) == 1: - return outcome_name[0] +def infer_col_names( + df: pd.DataFrame, + prefix: str, + allow_multiple: bool = True, +) -> list[str]: + """Infer col names based on prefix.""" + col_name = [c for c in df.columns if c.startswith(prefix)] + + if len(col_name) == 1: + return col_name + elif len(col_name) > 1: + if allow_multiple: + return col_name + raise ValueError( + f"Multiple columns found and allow_multiple is {allow_multiple}.", + ) + elif not col_name: + raise ValueError("No outcome col name inferred") else: - raise ValueError("More than one outcome inferred") + raise ValueError("No outcomes inferred") + + +def infer_outcome_col_name( + df: pd.DataFrame, + prefix: str = "outc_", + allow_multiple: bool = True, +) -> list[str]: + """Infer the outcome column name from the dataframe.""" + return infer_col_names(df=df, prefix=prefix, allow_multiple=allow_multiple) -def infer_y_hat_prob_col_name(df: pd.DataFrame) -> str: +def infer_predictor_col_name( + df: pd.DataFrame, + prefix: str = "pred_", + allow_multiple: bool = True, +) -> Union[str, list[str]]: + """Get the predictors that are used in the model.""" + return infer_col_names(df=df, prefix=prefix, allow_multiple=allow_multiple) + + +def infer_y_hat_prob_col_name( + df: pd.DataFrame, + prefix="y_hat_prob", + allow_multiple: bool = False, +) -> list[str]: """Infer the y_hat_prob column name from the dataframe.""" - y_hat_prob_name = [c for c in df.columns if c.startswith("y_hat_prob")] - if len(y_hat_prob_name) == 1: - return y_hat_prob_name[0] - else: - raise ValueError("More than one y_hat_prob inferred") + return infer_col_names(df=df, prefix=prefix, allow_multiple=allow_multiple) + + +def get_percent_lost(n_before: Union[int, float], n_after: Union[int, float]) -> float: + """Get the percent lost.""" + return round((100 * (1 - n_after / n_before)), 2) diff --git a/src/psycopt2d/visualization/base_charts.py b/src/psycopt2d/visualization/base_charts.py index 8f1188e4..bbad7fe6 100644 --- a/src/psycopt2d/visualization/base_charts.py +++ b/src/psycopt2d/visualization/base_charts.py @@ -12,7 +12,7 @@ def plot_basic_chart( y_values: Iterable, x_title: str, y_title: str, - plot_type: Optional[Union[list[str], str]], + plot_type: Union[list[str], str], sort_x: Optional[Iterable[int]] = None, sort_y: Optional[Iterable[int]] = None, fig_size: Optional[tuple] = (10, 10), diff --git a/src/psycopt2d/visualization/feature_importance.py b/src/psycopt2d/visualization/feature_importance.py index 59577105..1b7d9084 100644 --- a/src/psycopt2d/visualization/feature_importance.py +++ b/src/psycopt2d/visualization/feature_importance.py @@ -47,7 +47,7 @@ def plot_feature_importances( y_values=df["feature_importances"].tolist(), x_title="Feature importance (gain)", y_title="Feature name", - sort_x=np.flip(np.arange(len(feature_importances))), + sort_x=np.flip(np.arange(len(df["feature_importances"]))), plot_type="hbar", fig_size=(16, 10), save_path=save_path, diff --git a/src/psycopt2d/visualization/performance_over_time.py b/src/psycopt2d/visualization/performance_over_time.py index e5ca1dac..b5c1d265 100644 --- a/src/psycopt2d/visualization/performance_over_time.py +++ b/src/psycopt2d/visualization/performance_over_time.py @@ -11,7 +11,7 @@ import pandas as pd from sklearn.metrics import f1_score, roc_auc_score -from psycopt2d.utils import bin_continuous_data, round_floats_to_edge +from psycopt2d.utils.utils import bin_continuous_data, round_floats_to_edge from psycopt2d.visualization.base_charts import plot_basic_chart diff --git a/src/psycopt2d/visualization/prob_over_time.py b/src/psycopt2d/visualization/prob_over_time.py index 7adba6a3..4a9fc2ac 100644 --- a/src/psycopt2d/visualization/prob_over_time.py +++ b/src/psycopt2d/visualization/prob_over_time.py @@ -141,7 +141,7 @@ def plot_prob_over_time( if __name__ == "__main__": - from psycopt2d.utils import PROJECT_ROOT + from psycopt2d.utils.utils import PROJECT_ROOT path = PROJECT_ROOT / "tests" / "test_data" / "synth_eval_data.csv" df = pd.read_csv(path) diff --git a/src/psycopt2d/visualization/sens_over_time.py b/src/psycopt2d/visualization/sens_over_time.py index c7f6be5a..306164ab 100644 --- a/src/psycopt2d/visualization/sens_over_time.py +++ b/src/psycopt2d/visualization/sens_over_time.py @@ -9,7 +9,7 @@ import numpy as np import pandas as pd -from psycopt2d.utils import PROJECT_ROOT, round_floats_to_edge +from psycopt2d.utils.utils import PROJECT_ROOT, round_floats_to_edge def create_sensitivity_by_time_to_outcome_df( @@ -303,7 +303,7 @@ def plot_sensitivity_by_time_to_outcome_heatmap( Examples: >>> from pathlib import Path - >>> from psycopt2d.utils import positive_rate_to_pred_probs + >>> from psycopt2d.utils.utils import positive_rate_to_pred_probs >>> repo_path = Path(__file__).parent.parent.parent.parent >>> path = repo_path / "tests" / "test_data" / "synth_eval_data.csv" @@ -381,7 +381,7 @@ def plot_sensitivity_by_time_to_outcome_heatmap( if __name__ == "__main__": - from psycopt2d.utils import positive_rate_to_pred_probs + from psycopt2d.utils.utils import positive_rate_to_pred_probs path = PROJECT_ROOT / "tests" / "test_data" / "synth_eval_data.csv" df = pd.read_csv(path) diff --git a/tests/test_auc_by_group_table.py b/tests/test_auc_by_group_table.py index d7d88ac3..3f91de9f 100644 --- a/tests/test_auc_by_group_table.py +++ b/tests/test_auc_by_group_table.py @@ -2,10 +2,10 @@ # pylint: disable=missing-function-docstring from psycopt2d.tables import auc_by_group_df -from psycopt2d.utils import bin_continuous_data +from psycopt2d.utils.utils import bin_continuous_data -def test_auc_by_group_table(synth_data): +def test_auc_by_group_df(synth_data): synth_data["Age bins"] = bin_continuous_data( synth_data["age"], bins=[0, 18, 30, 50, 120], diff --git a/tests/test_calculate_performance_metrics.py b/tests/test_calculate_performance_metrics.py index db630a4f..0f053e71 100644 --- a/tests/test_calculate_performance_metrics.py +++ b/tests/test_calculate_performance_metrics.py @@ -1,5 +1,5 @@ # import wandb -# from psycopt2d.utils import calculate_performance_metrics +# from psycopt2d.utils.utils import calculate_performance_metrics # def test_log_performance_metrics(synth_data): diff --git a/tests/test_load.py b/tests/test_load.py index c0be3a58..90991383 100644 --- a/tests/test_load.py +++ b/tests/test_load.py @@ -2,6 +2,7 @@ from hydra import compose, initialize from psycopt2d.load import load_train_and_val_from_cfg +from psycopt2d.utils.configs import omegaconf_to_pydantic_objects def test_load_lookbehind_exceeds_lookbehind_threshold(): @@ -10,14 +11,13 @@ def test_load_lookbehind_exceeds_lookbehind_threshold(): with initialize(version_base=None, config_path="../src/psycopt2d/config/"): cfg = compose( config_name="integration_testing.yaml", - overrides=[ - "++data.min_lookbehind_days=90", - ], + overrides=["data.min_lookbehind_days=60"], ) + cfg = omegaconf_to_pydantic_objects(cfg) split_dataset = load_train_and_val_from_cfg(cfg) - assert split_dataset.train.shape == (644, 7) + assert split_dataset.train.shape[1] == 7 def test_load_lookbehind_not_in_lookbehind_combination(): @@ -26,11 +26,11 @@ def test_load_lookbehind_not_in_lookbehind_combination(): with initialize(version_base=None, config_path="../src/psycopt2d/config/"): cfg = compose( config_name="integration_testing.yaml", - overrides=[ - "++data.lookbehind_combination=[30]", - ], + overrides=["data.lookbehind_combination=[30]"], ) + cfg = omegaconf_to_pydantic_objects(cfg) + split_dataset = load_train_and_val_from_cfg(cfg) - assert split_dataset.train.shape == (700, 6) + assert split_dataset.train.shape[1] == 6 diff --git a/tests/test_performance_by_threshold.py b/tests/test_performance_by_threshold.py index 6d3b0630..499640e0 100644 --- a/tests/test_performance_by_threshold.py +++ b/tests/test_performance_by_threshold.py @@ -14,7 +14,7 @@ days_from_first_positive_to_diagnosis, generate_performance_by_positive_rate_table, ) -from psycopt2d.utils import positive_rate_to_pred_probs +from psycopt2d.utils.utils import positive_rate_to_pred_probs @pytest.fixture(scope="function") diff --git a/tests/test_train_model.py b/tests/test_train_model.py index 90cd70dd..21762d11 100644 --- a/tests/test_train_model.py +++ b/tests/test_train_model.py @@ -3,12 +3,14 @@ import pytest from hydra import compose, initialize +from psycopt2d.load import load_train_from_cfg from psycopt2d.models import MODELS from psycopt2d.train_model import main +from psycopt2d.utils.configs import omegaconf_to_pydantic_objects CONFIG_DIR_PATH = "../src/psycopt2d/config/" -CONFIG_FILE_NAME = "integration_testing.yaml" -INTEGRATION_TESTING_MODEL_OVERRIDE = "+model=logistic-regression" +INTEGRATION_TEST_FILE_NAME = "integration_testing.yaml" +INTEGRATION_TESTING_MODEL_OVERRIDE = "model=logistic-regression" @pytest.mark.parametrize("model_name", MODELS.keys()) @@ -17,14 +19,16 @@ def test_main(model_name): with initialize(version_base=None, config_path=CONFIG_DIR_PATH): cfg = compose( - config_name=CONFIG_FILE_NAME, - overrides=[f"+model={model_name}"], + config_name=INTEGRATION_TEST_FILE_NAME, + overrides=[f"model={model_name}"], ) + cfg = omegaconf_to_pydantic_objects(cfg) + # XGBoost should train on GPU on Overtaci, # but CPU during integration testing if model_name == "xgboost": - cfg.model.args.tree_method = "auto" + cfg.model.args["tree_method"] = "auto" main(cfg) @@ -38,7 +42,7 @@ def test_integration_test(): with initialize(version_base=None, config_path=CONFIG_DIR_PATH): cfg = compose( - config_name=CONFIG_FILE_NAME, + config_name=INTEGRATION_TEST_FILE_NAME, overrides=[INTEGRATION_TESTING_MODEL_OVERRIDE], ) main(cfg) @@ -48,7 +52,7 @@ def test_crossvalidation(): """Test crossvalidation.""" with initialize(version_base=None, config_path=CONFIG_DIR_PATH): cfg = compose( - config_name=CONFIG_FILE_NAME, + config_name=INTEGRATION_TEST_FILE_NAME, overrides=[INTEGRATION_TESTING_MODEL_OVERRIDE, "+data.n_splits=2"], ) main(cfg) @@ -58,7 +62,7 @@ def test_min_prediction_time_date(): """Test crossvalidation.""" with initialize(version_base=None, config_path=CONFIG_DIR_PATH): cfg = compose( - config_name=CONFIG_FILE_NAME, + config_name=INTEGRATION_TEST_FILE_NAME, overrides=[ INTEGRATION_TESTING_MODEL_OVERRIDE, "+data.min_prediction_time_date=1972-01-01", @@ -70,13 +74,14 @@ def test_min_prediction_time_date(): def test_feature_selection(): """Test feature selection.""" with initialize(version_base=None, config_path=CONFIG_DIR_PATH): + cfg = compose( - config_name=CONFIG_FILE_NAME, + config_name=INTEGRATION_TEST_FILE_NAME, overrides=[ INTEGRATION_TESTING_MODEL_OVERRIDE, - "preprocessing.feature_selection_method=f_classif", - "preprocessing.feature_selection_params.percentile=10", - # "project.wandb_mode=run", + "preprocessing.feature_selection.name=f_classif", + "preprocessing.feature_selection.params.percentile=10", ], ) + main(cfg) diff --git a/tests/test_utils.py b/tests/test_utils.py index b3ddf1fe..f619872d 100644 --- a/tests/test_utils.py +++ b/tests/test_utils.py @@ -1,10 +1,16 @@ """Testing of the utils module.""" # pylint: disable=missing-function-docstring +from pathlib import Path + import numpy as np import pandas as pd +import pytest +from hydra import compose, initialize from utils_for_testing import str_to_df -from psycopt2d.utils import ( +from psycopt2d.utils.configs import omegaconf_to_pydantic_objects +from psycopt2d.utils.utils import ( + PROJECT_ROOT, drop_records_if_datediff_days_smaller_than, flatten_nested_dict, ) @@ -50,3 +56,23 @@ def test_flatten_nested_dict(): output_dict = flatten_nested_dict(input_dict) assert expected_dict == output_dict + + +CONFIG_DIR_PATH_ABS = PROJECT_ROOT / "src" / "psycopt2d" / "config" +CONFIG_DIR_PATH_REL = "../src/psycopt2d/config" + + +def get_config_file_names() -> list[str]: + """Get all config file names.""" + config_file_paths: list[Path] = list(CONFIG_DIR_PATH_ABS.glob("*.yaml")) + return [f"{path.stem}.yaml" for path in config_file_paths] + + +@pytest.mark.parametrize("config_file_name", get_config_file_names()) +def test_configs(config_file_name): + with initialize(version_base=None, config_path=CONFIG_DIR_PATH_REL): + cfg = compose( + config_name=config_file_name, + ) + + cfg = omegaconf_to_pydantic_objects(conf=cfg) diff --git a/tests/test_visualizations.py b/tests/test_visualizations.py index b108e04f..963fa5d1 100644 --- a/tests/test_visualizations.py +++ b/tests/test_visualizations.py @@ -10,7 +10,7 @@ import pytest from sklearn.metrics import f1_score, roc_auc_score -from psycopt2d.utils import positive_rate_to_pred_probs +from psycopt2d.utils.utils import positive_rate_to_pred_probs from psycopt2d.visualization import plot_prob_over_time from psycopt2d.visualization.base_charts import plot_basic_chart from psycopt2d.visualization.feature_importance import plot_feature_importances