diff --git a/src/psycop_feature_generation/timeseriesflattener/flattened_dataset.py b/src/psycop_feature_generation/timeseriesflattener/flattened_dataset.py index d6a0ee0c..d7a9548a 100644 --- a/src/psycop_feature_generation/timeseriesflattener/flattened_dataset.py +++ b/src/psycop_feature_generation/timeseriesflattener/flattened_dataset.py @@ -1,6 +1,5 @@ """Takes a time-series and flattens it into a set of prediction times with describing values.""" - import datetime as dt import os from collections.abc import Callable @@ -9,10 +8,13 @@ from pathlib import Path from typing import Any, Optional, Union +import dask.dataframe as dd import numpy as np import pandas as pd from catalogue import Registry # noqa # pylint: disable=unused-import +from dask.diagnostics import ProgressBar from pandas import DataFrame +from tqdm.dask import TqdmCallback from wasabi import Printer, msg from psycop_feature_generation.timeseriesflattener.resolve_multiple_functions import ( @@ -24,6 +26,8 @@ generate_feature_colname, ) +ProgressBar().register() + def select_and_assert_keys(dictionary: dict, key_list: list[str]) -> dict: """Keep only the keys in the dictionary that are in key_order, and orders @@ -604,10 +608,18 @@ def add_temporal_predictors_from_list_of_argument_dictionaries( # pylint: disab ] msg.info("Feature generation complete, concatenating") - concatenated_dfs = pd.concat( - flattened_predictor_dfs, - axis=1, - ).reset_index() + + flattened_predictor_dds = [ + dd.from_pandas(df, npartitions=6) for df in flattened_predictor_dfs + ] + + # Concatenate with dask, and show progress bar + with TqdmCallback(desc="compute"): + concatenated_dfs = ( + dd.concat(flattened_predictor_dds, axis=1, interleave_partitions=True) + .compute() # Converts to pandas dataframe + .reset_index() + ) self.df = pd.merge( self.df,