Skip to content
This repository has been archived by the owner on May 1, 2023. It is now read-only.

Commit

Permalink
Merge pull request #349 from Aarhus-Psychiatry-Research/martbern/refa…
Browse files Browse the repository at this point in the history
…ctor_main

Refactor main
  • Loading branch information
MartinBernstorff authored Dec 22, 2022
2 parents a4b4d91 + 5fa4ddb commit c9b3b8d
Show file tree
Hide file tree
Showing 8 changed files with 298 additions and 227 deletions.
219 changes: 13 additions & 206 deletions application/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,232 +5,39 @@
- Replace the HYDRA_ARGS string with the desired arguments for `train_model.py`
- Run this script from project root with `python src/psycop_model_training/train_and_log_models.py
"""
import random
import subprocess
import time
from typing import Optional

import pandas as pd
import wandb
from psycopmlutils.wandb.wandb_try_except_decorator import wandb_alert_on_exception
from random_word import RandomWords
from wasabi import Printer

from psycop_model_training.data_loader.data_loader import DataLoader
from psycop_model_training.utils.col_name_inference import (
infer_look_distance,
infer_outcome_col_name,
)
from psycop_model_training.utils.config_schemas.conf_utils import (
BaseModel,
load_app_cfg_as_pydantic,
from psycop_model_training.application_modules.get_search_space import (
SearchSpaceInferrer,
)
from psycop_model_training.utils.config_schemas.full_config import FullConfigSchema


def start_trainer(
cfg: FullConfigSchema,
config_file_name: str,
lookahead_days: int,
wandb_group_override: str,
model_name: str,
) -> subprocess.Popen:
"""Start a trainer."""
msg = Printer(timestamp=True)

subprocess_args: list[str] = [
"python",
"application/train_model.py",
f"project.wandb.group='{wandb_group_override}'",
f"project.wandb.mode={cfg.project.wandb.mode}",
f"hydra.sweeper.n_trials={cfg.train.n_trials_per_lookahead}",
f"hydra.sweeper.n_jobs={cfg.train.n_jobs_per_trainer}",
f"model={model_name}",
f"preprocessing.pre_split.min_lookahead_days={lookahead_days}",
"--config-name",
f"{config_file_name}",
]

if cfg.train.n_trials_per_lookahead > 1:
subprocess_args.insert(2, "--multirun")

if model_name == "xgboost":
subprocess_args.insert(3, "++model.args.tree_method='gpu_hist'")

msg.info(f'{" ".join(subprocess_args)}')

return subprocess.Popen( # pylint: disable=consider-using-with
args=subprocess_args,
)


class TrainerSpec(BaseModel):
"""Specification for starting a trainer.
Provides overrides for the config file.
"""

lookahead_days: int
model_name: str


def combine_lookaheads_and_model_names_to_trainer_specs(
cfg: FullConfigSchema,
possible_lookahead_days: list[int],
model_names: Optional[list[str]] = None,
):
"""Generate trainer specs for all combinations of lookaheads and model
names."""
msg = Printer(timestamp=True)

random.shuffle(possible_lookahead_days)

if model_names:
msg.warn(
"model_names was specified in train_models_for_each_cell_in_grid, overriding cfg.model.name",
)

model_name_queue = model_names if model_names else cfg.model.name

# Create all combinations of lookahead_days and models
trainer_combinations_queue = [
TrainerSpec(lookahead_days=lookahead_days, model_name=model_name)
for lookahead_days in possible_lookahead_days.copy()
for model_name in model_name_queue
]

return trainer_combinations_queue


def train_models_for_each_cell_in_grid(
cfg: FullConfigSchema,
possible_lookahead_days: list[int],
config_file_name: str,
wandb_prefix: str,
model_names: Optional[list[str]] = None,
):
"""Train a model for each cell in the grid of possible look distances."""
active_trainers: list[subprocess.Popen] = []

trainer_combinations_queue = combine_lookaheads_and_model_names_to_trainer_specs(
cfg=cfg,
possible_lookahead_days=possible_lookahead_days,
model_names=model_names,
)

while trainer_combinations_queue or active_trainers:
# Wait until there is a free slot in the trainers group
if (
len(active_trainers) >= cfg.train.n_active_trainers
or len(trainer_combinations_queue) == 0
):
# Drop trainers if they have finished
# If finished, t.poll() is not None
active_trainers = [t for t in active_trainers if t.poll() is None]
time.sleep(1)
continue

# Start a new trainer
trainer_spec = trainer_combinations_queue.pop()

msg = Printer(timestamp=True)
msg.info(
f"Spawning a new trainer with lookahead={trainer_spec.lookahead_days} days",
)
wandb_group = f"{wandb_prefix}"

active_trainers.append(
start_trainer(
cfg=cfg,
config_file_name=config_file_name,
lookahead_days=trainer_spec.lookahead_days,
wandb_group_override=wandb_group,
model_name=trainer_spec.model_name,
),
)

# Sleep a bit to avoid segfaults
time.sleep(10)


def get_possible_lookaheads(
msg: Printer,
cfg: FullConfigSchema,
train_df: pd.DataFrame,
) -> list[int]:
"""Some look_ahead and look_behind distances will result in 0 valid
prediction times. Only return combinations which will allow some prediction
times.
E.g. if we only have 4 years of data:
- min_lookahead = 2 years
- min_lookbehind = 3 years
Will mean that no rows satisfy the criteria.
"""

outcome_col_names = infer_outcome_col_name(df=train_df, allow_multiple=True)

possible_lookahead_days: list[int] = [
int(dist) for dist in infer_look_distance(col_name=outcome_col_names)
]

# Don't try look distance combinations which will result in 0 rows
max_distance_in_dataset_days = (
max(train_df[cfg.data.col_name.pred_timestamp])
- min(
train_df[cfg.data.col_name.pred_timestamp],
)
).days

lookaheads_without_rows: list[int] = [
dist for dist in possible_lookahead_days if dist > max_distance_in_dataset_days
]

if lookaheads_without_rows:
msg.info(
f"Not fitting model to {lookaheads_without_rows}, since no rows satisfy the criteria.",
)

return list(set(possible_lookahead_days) - set(lookaheads_without_rows))
from psycop_model_training.application_modules.setup import setup
from psycop_model_training.application_modules.trainer_spawner import spawn_trainers
from psycop_model_training.data_loader.data_loader import DataLoader


@wandb_alert_on_exception
def main():
"""Main."""
msg = Printer(timestamp=True)

config_file_name = "default_config.yaml"

cfg = load_app_cfg_as_pydantic(config_file_name=config_file_name)

random_word = RandomWords()
wandb_group = f"{random_word.get_random_word()}-{random_word.get_random_word()}"

wandb.init(
project=f"{cfg.project.name}-baseline-model-training",
mode=cfg.project.wandb.mode,
group=wandb_group,
entity=cfg.project.wandb.entity,
name="process_manager",
)
cfg, wandb_group = setup(config_file_name=config_file_name)

# Load dataset without dropping any rows for inferring
# which look distances to grid search over
train = DataLoader(cfg=cfg).load_dataset_from_dir(split_names="train")
train_df = DataLoader(cfg=cfg).load_dataset_from_dir(split_names="train")

possible_lookaheads = get_possible_lookaheads(
msg=msg,
trainer_specs = SearchSpaceInferrer(
cfg=cfg,
train_df=train,
)
train_df=train_df,
model_names=["xgboost", "logistic_regression"],
).get_trainer_specs()

train_models_for_each_cell_in_grid(
spawn_trainers(
cfg=cfg,
possible_lookahead_days=possible_lookaheads,
config_file_name=config_file_name,
wandb_prefix=wandb_group,
model_names=["xgboost", "logistic-regression"],
trainer_specs=trainer_specs,
)


Expand Down
23 changes: 23 additions & 0 deletions application/train_model_from_application_module.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
"""Script using the train_model module to train a model.
Required to allow the trainer_spawner to point towards a python script
file, rather than an installed module.
"""
import hydra

from psycop_model_training.application_modules.train_model import train_model
from psycop_model_training.training.train_and_eval import CONFIG_PATH


@hydra.main(
config_path=str(CONFIG_PATH),
config_name="default_config",
version_base="1.2",
)
def main():
"""Main."""
train_model()


if __name__ == "__main__":
main()
Empty file.
126 changes: 126 additions & 0 deletions src/psycop_model_training/application_modules/get_search_space.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
import random
from typing import List, Optional, Union

import pandas as pd
from wasabi import Printer

from psycop_model_training.utils.basemodel import BaseModel
from psycop_model_training.utils.col_name_inference import (
infer_look_distance,
infer_outcome_col_name,
)
from psycop_model_training.utils.config_schemas.full_config import FullConfigSchema


class TrainerSpec(BaseModel):
"""Specification for starting a trainer.
Provides overrides for the config file.
"""

lookahead_days: int
model_name: str


class SearchSpaceInferrer:
"""Infer the search space for the model training pipeline."""

def __init__(
self,
cfg: FullConfigSchema,
train_df: pd.DataFrame,
model_names: list[str],
):
self.cfg = cfg
self.train_df = train_df
self.model_names = model_names

def _get_impossible_lookaheads(
self,
potential_lookaheads: list[int],
):
"""Some look_ahead and look_behind distances will result in 0 valid
prediction times.
E.g. if we only have 4 years of data:
- min_lookahead = 2 years
- min_lookbehind = 3 years
Will mean that no rows satisfy the criteria.
"""
max_interval_days = (
max(self.train_df[self.cfg.data.col_name.pred_timestamp])
- min(
self.train_df[self.cfg.data.col_name.pred_timestamp],
)
).days

msg = Printer(timestamp=True)
lookaheads_without_rows: list[int] = [
dist for dist in potential_lookaheads if dist > max_interval_days
]

if lookaheads_without_rows:
msg.info(
f"Not fitting model to {lookaheads_without_rows}, since no rows satisfy the criteria.",
)

return lookaheads_without_rows

def _get_possible_lookaheads(self) -> list[int]:
"""Some look_ahead and look_behind distances will result in 0 valid
prediction times. Only return combinations which will allow some
prediction times.
E.g. if we only have 4 years of data:
- min_lookahead = 2 years
- min_lookbehind = 3 years
Will mean that no rows satisfy the criteria.
"""
outcome_col_names = infer_outcome_col_name(
df=self.train_df,
allow_multiple=True,
)

potential_lookaheads: list[int] = [
int(dist) for dist in infer_look_distance(col_name=outcome_col_names)
]

impossible_lookaheads = self._get_impossible_lookaheads(
potential_lookaheads=potential_lookaheads,
)

return list(set(potential_lookaheads) - set(impossible_lookaheads))

def _combine_lookaheads_and_model_names_to_trainer_specs(
self,
possible_lookahead_days: list[int],
) -> list[TrainerSpec]:
"""Generate trainer specs for all combinations of lookaheads and model
names."""
msg = Printer(timestamp=True)

random.shuffle(possible_lookahead_days)

if self.model_names:
msg.warn(
"model_names was specified in train_models_for_each_cell_in_grid, overriding self.cfg.model.name",
)

model_name_queue = self.model_names if self.model_names else self.cfg.model.name

# Create all combinations of lookahead_days and models
trainer_combinations_queue = [
TrainerSpec(lookahead_days=lookahead_days, model_name=model_name)
for lookahead_days in possible_lookahead_days.copy()
for model_name in model_name_queue
]

return trainer_combinations_queue

def get_trainer_specs(self) -> list[TrainerSpec]:
"""Get all possible combinations of lookaheads and models."""
return self._combine_lookaheads_and_model_names_to_trainer_specs(
possible_lookahead_days=self._get_possible_lookaheads(),
)
Loading

0 comments on commit c9b3b8d

Please sign in to comment.