From 5e6b2314547334bbd5dbfc9cc5e019efcb3d67c5 Mon Sep 17 00:00:00 2001 From: Oskar Triebe Date: Thu, 12 Sep 2024 18:42:25 -0700 Subject: [PATCH] [Minor] Reduce DataFrame copies (#1648) * don't copy return df * explicity copy df before calling merge * remove copy from normalize * fix test * remove cv fold copy * do not drop ID when normalizing * do not copy when init data params * cleanup * remove copy from global cv intersect * remove copy from create_dict_for_events_or_regressors * handle None events regressors * fix regressors * remove copy from _normalize * remove copy from _make_future_dataframe * remove copy from _prepare_dataframe_to_predict * remove double copy in plotting func * fix typo * remove split deepcopy * retain comments of former copies in split --- neuralprophet/data/process.py | 8 +--- neuralprophet/data/split.py | 11 +++--- neuralprophet/data/transform.py | 4 +- neuralprophet/df_utils.py | 67 +++++++++++++-------------------- neuralprophet/forecaster.py | 37 ++++++++++++------ tests/test_unit.py | 4 +- 6 files changed, 60 insertions(+), 71 deletions(-) diff --git a/neuralprophet/data/process.py b/neuralprophet/data/process.py index 3e099c83c..815e56fe7 100644 --- a/neuralprophet/data/process.py +++ b/neuralprophet/data/process.py @@ -251,7 +251,6 @@ def _prepare_dataframe_to_predict(model, df: pd.DataFrame, max_lags: int, freq: # Receives df with ID column df_prepared = pd.DataFrame() for df_name, df_i in df.groupby("ID"): - df_i = df_i.copy(deep=True) _ = df_utils.infer_frequency(df_i, n_lags=max_lags, freq=freq) # check if received pre-processed df if "y_scaled" in df_i.columns or "t" in df_i.columns: @@ -283,7 +282,7 @@ def _prepare_dataframe_to_predict(model, df: pd.DataFrame, max_lags: int, freq: config_seasonality=model.config_seasonality, predicting=True, ) - df_prepared = pd.concat((df_prepared, df_i.copy(deep=True).reset_index(drop=True)), ignore_index=True) + df_prepared = pd.concat((df_prepared, df_i.reset_index(drop=True)), ignore_index=True) return df_prepared @@ -399,8 +398,6 @@ def _check_dataframe( "Dataframe has less than n_forecasts + n_lags rows. " "Forecasting not possible. Please either use a larger dataset, or adjust the model parameters." ) - # df = df.copy(deep=True) - # df, _, _, _ = df_utils.check_multiple_series_id(df) df, regressors_to_remove, lag_regressors_to_remove = df_utils.check_dataframe( df=df, check_y=check_y, @@ -475,9 +472,6 @@ def _handle_missing_data( The pre-processed DataFrame, including imputed missing data, if applicable. """ - # df = df.copy(deep=True) - # df, _, _, _ = df_utils.check_multiple_series_id(df) - if n_lags == 0 and not predicting: # drop rows with NaNs in y and count them df_na_dropped = df.dropna(subset=["y"]) diff --git a/neuralprophet/data/split.py b/neuralprophet/data/split.py index 8a39700f6..6824cdfc2 100644 --- a/neuralprophet/data/split.py +++ b/neuralprophet/data/split.py @@ -65,7 +65,7 @@ def _maybe_extend_df( future_df["ID"] = df_name df_i = pd.concat([df_i, future_df]) df_i.reset_index(drop=True, inplace=True) - extended_df = pd.concat((extended_df, df_i.copy(deep=True)), ignore_index=True) + extended_df = pd.concat((extended_df, df_i), ignore_index=True) return extended_df, periods_add @@ -126,8 +126,8 @@ def _get_maybe_extend_periods( def _make_future_dataframe( model, df: pd.DataFrame, - events_df: pd.DataFrame, - regressors_df: pd.DataFrame, + events_df: Optional[pd.DataFrame], + regressors_df: Optional[pd.DataFrame], periods: Optional[int], n_historic_predictions: int, n_forecasts: int, @@ -174,13 +174,12 @@ def _make_future_dataframe( log.warning( "Not extending df into future as no periods specified. You can skip this and predict directly instead." ) - df = df.copy(deep=True) _ = df_utils.infer_frequency(df, n_lags=max_lags, freq=freq) last_date = pd.to_datetime(df["ds"].copy(deep=True).dropna()).sort_values().max() if events_df is not None: - events_df = events_df.copy(deep=True).reset_index(drop=True) + events_df = events_df.reset_index(drop=True) if regressors_df is not None: - regressors_df = regressors_df.copy(deep=True).reset_index(drop=True) + regressors_df = regressors_df.reset_index(drop=True) if periods is None: periods = 1 if max_lags == 0 else n_forecasts else: diff --git a/neuralprophet/data/transform.py b/neuralprophet/data/transform.py index e79518af3..ef743651d 100644 --- a/neuralprophet/data/transform.py +++ b/neuralprophet/data/transform.py @@ -24,13 +24,11 @@ def _normalize(df: pd.DataFrame, config_normalization: Normalization) -> pd.Data ------- df: pd.DataFrame, normalized """ - # df = df.copy(deep=True) - # df, _, _, _ = df_utils.check_multiple_series_id(df) df_norm = pd.DataFrame() for df_name, df_i in df.groupby("ID"): data_params = config_normalization.get_data_params(df_name) df_i.drop("ID", axis=1, inplace=True) - df_aux = df_utils.normalize(df_i, data_params).copy(deep=True) + df_aux = df_utils.normalize(df_i, data_params) df_aux["ID"] = df_name df_norm = pd.concat((df_norm, df_aux), ignore_index=True) return df_norm diff --git a/neuralprophet/df_utils.py b/neuralprophet/df_utils.py index 0252fdf72..9a6c76d6d 100644 --- a/neuralprophet/df_utils.py +++ b/neuralprophet/df_utils.py @@ -77,12 +77,11 @@ def return_df_in_original_format(df, received_ID_col=False, received_single_time pd.Dataframe original input format """ - new_df = df.copy(deep=True) if not received_ID_col and received_single_time_series: - assert len(new_df["ID"].unique()) == 1 - new_df.drop("ID", axis=1, inplace=True) + assert len(df["ID"].unique()) == 1 + df.drop("ID", axis=1, inplace=True) log.info("Returning df with no ID column") - return new_df + return df def merge_dataframes(df: pd.DataFrame) -> pd.DataFrame: @@ -102,7 +101,7 @@ def merge_dataframes(df: pd.DataFrame) -> pd.DataFrame: raise ValueError("Can not join other than pd.DataFrames") if "ID" not in df.columns: raise ValueError("df does not contain 'ID' column") - df_merged = df.copy(deep=True).drop("ID", axis=1) + df_merged = df.drop("ID", axis=1) df_merged = df_merged.sort_values("ds") df_merged = df_merged.drop_duplicates(subset=["ds"]) df_merged = df_merged.reset_index(drop=True) @@ -282,11 +281,8 @@ def init_data_params( ShiftScale entries containing ``shift`` and ``scale`` parameters for each column """ # Compute Global data params - # df = df.copy(deep=True) - # df, _, _, _ = check_multiple_series_id(df) - df_merged = df.copy(deep=True).drop("ID", axis=1) global_data_params = data_params_definition( - df_merged, normalize, config_lagged_regressors, config_regressors, config_events, config_seasonality + df, normalize, config_lagged_regressors, config_regressors, config_events, config_seasonality ) if global_normalization: log.debug( @@ -296,7 +292,6 @@ def init_data_params( local_data_params = OrderedDict() local_run_despite_global = True if global_normalization else None for df_name, df_i in df.groupby("ID"): - df_i.drop("ID", axis=1, inplace=True) local_data_params[df_name] = data_params_definition( df=df_i, normalize=normalize, @@ -378,7 +373,6 @@ def normalize(df, data_params): pd.DataFrame normalized dataframes """ - df = df.copy(deep=True) for name in df.columns: if name == "ID": continue @@ -428,8 +422,7 @@ def check_dataframe( pd.DataFrame or dict checked dataframe """ - # df = df.copy(deep=True) - # df, _, _, _ = check_multiple_series_id(df) + # TODO: move call to check_multiple_series_id here if df.groupby("ID").size().min() < 1: raise ValueError("Dataframe has no rows.") if "ds" not in df: @@ -542,7 +535,7 @@ def _crossvalidation_split_df(df, n_lags, n_forecasts, k, fold_pct, fold_overlap min_train = total_samples - samples_fold - (k - 1) * (samples_fold - samples_overlap) assert min_train >= samples_fold folds = [] - df_fold = df.copy(deep=True) + df_fold = df for i in range(k, 0, -1): df_train, df_val = split_df(df_fold, n_lags, n_forecasts, valid_p=samples_fold, inputs_overbleed=True) folds.append((df_train, df_val)) @@ -635,7 +628,7 @@ def _crossvalidation_with_time_threshold(df, n_lags, n_forecasts, k, fold_pct, f validation data """ - df_merged = merge_dataframes(df) + df_merged = merge_dataframes(df.copy(deep=True)) total_samples = len(df_merged) - n_lags + 2 - (2 * n_forecasts) samples_fold = max(1, int(fold_pct * total_samples)) samples_overlap = int(fold_overlap_pct * samples_fold) @@ -643,25 +636,22 @@ def _crossvalidation_with_time_threshold(df, n_lags, n_forecasts, k, fold_pct, f min_train = total_samples - samples_fold - (k - 1) * (samples_fold - samples_overlap) assert min_train >= samples_fold folds = [] - df_fold = df - # df_fold = df.copy(deep=True) - # df_fold, _, _, _ = check_multiple_series_id(df_fold) for i in range(k, 0, -1): - threshold_time_stamp = find_time_threshold(df_fold, n_lags, n_forecasts, samples_fold, inputs_overbleed=True) + threshold_time_stamp = find_time_threshold(df, n_lags, n_forecasts, samples_fold, inputs_overbleed=True) df_train, df_val = split_considering_timestamp( - df_fold, n_lags, n_forecasts, inputs_overbleed=True, threshold_time_stamp=threshold_time_stamp + df, n_lags, n_forecasts, inputs_overbleed=True, threshold_time_stamp=threshold_time_stamp ) folds.append((df_train, df_val)) split_idx = len(df_merged) - samples_fold + samples_overlap df_merged = df_merged[:split_idx].reset_index(drop=True) threshold_time_stamp = df_merged["ds"].iloc[-1] df_fold_aux = pd.DataFrame() - for df_name, df_i in df_fold.groupby("ID"): - df_aux = ( - df_i.copy(deep=True).iloc[: len(df_i[df_i["ds"] < threshold_time_stamp]) + 1].reset_index(drop=True) - ) + for df_name, df_i in df.groupby("ID"): + # df_i = df_i.copy(deep=True) + df_aux = df_i.iloc[: len(df_i[df_i["ds"] < threshold_time_stamp]) + 1].reset_index(drop=True) df_fold_aux = pd.concat((df_fold_aux, df_aux), ignore_index=True) - df_fold = df_fold_aux.copy(deep=True) + df = df_fold_aux + # df = df.copy(deep=True) folds = folds[::-1] return folds @@ -707,7 +697,6 @@ def crossvalidation_split_df( validation data """ - # df = df.copy(deep=True) df, _, _, _ = check_multiple_series_id(df) folds = [] if len(df["ID"].unique()) == 1: @@ -733,7 +722,7 @@ def crossvalidation_split_df( start_date, end_date = find_valid_time_interval_for_cv(df) for df_name, df_i in df.groupby("ID"): mask = (df_i["ds"] >= start_date) & (df_i["ds"] <= end_date) - df_i = df_i[mask].copy(deep=True) + df_i = df_i[mask] folds_dict[df_name] = _crossvalidation_split_df( df_i, n_lags, n_forecasts, k, fold_pct, fold_overlap_pct ) @@ -768,8 +757,6 @@ def double_crossvalidation_split_df(df, n_lags, n_forecasts, k, valid_pct, test_ tuple of k tuples [(folds_val, folds_test), …] elements same as :meth:`crossvalidation_split_df` returns """ - # df = df.copy(deep=True) - # df, _, _, _ = check_multiple_series_id(df) if len(df["ID"].unique()) > 1: raise NotImplementedError("double_crossvalidation_split_df not implemented for df with many time series") fold_pct_test = float(test_pct) / k @@ -800,7 +787,7 @@ def find_time_threshold(df, n_lags, n_forecasts, valid_p, inputs_overbleed): str time stamp threshold defines the boundary for the train and validation sets split. """ - df_merged = merge_dataframes(df) + df_merged = merge_dataframes(df.copy(deep=True)) n_samples = len(df_merged) - n_lags + 2 - (2 * n_forecasts) n_samples = n_samples if inputs_overbleed else n_samples - n_lags if 0.0 < valid_p < 1.0: @@ -842,11 +829,14 @@ def split_considering_timestamp(df, n_lags, n_forecasts, inputs_overbleed, thres df_val = pd.DataFrame() for df_name, df_i in df.groupby("ID"): if df[df["ID"] == df_name]["ds"].max() < threshold_time_stamp: - df_train = pd.concat((df_train, df_i.copy(deep=True)), ignore_index=True) + # df_i = df_i.copy(deep=True) + df_train = pd.concat((df_train, df_i), ignore_index=True) elif df[df["ID"] == df_name]["ds"].min() > threshold_time_stamp: - df_val = pd.concat((df_val, df_i.copy(deep=True)), ignore_index=True) + # df_i = df_i.copy(deep=True) + df_val = pd.concat((df_val, df_i), ignore_index=True) else: - df_aux = df_i.copy(deep=True) + df_aux = df_i + # df_i = df_i.copy(deep=True) n_train = len(df_aux[df_aux["ds"] < threshold_time_stamp]) split_idx_train = n_train + n_lags + n_forecasts - 1 split_idx_val = split_idx_train - n_lags if inputs_overbleed else split_idx_train @@ -890,8 +880,6 @@ def split_df( pd.DataFrame, dict validation data """ - # df = df.copy(deep=True) - # df, _, _, _ = check_multiple_series_id(df) df_train = pd.DataFrame() df_val = pd.DataFrame() if local_split: @@ -1373,8 +1361,6 @@ def infer_frequency(df, freq, n_lags, min_freq_percentage=0.7): Valid frequency tag according to major frequency. """ - # df = df.copy(deep=True) - # df, _, _, _ = check_multiple_series_id(df) freq_df = list() for df_name, df_i in df.groupby("ID"): freq_df.append(_infer_frequency(df_i, freq, min_freq_percentage)) @@ -1396,6 +1382,7 @@ def create_dict_for_events_or_regressors( df: pd.DataFrame, other_df: Optional[pd.DataFrame], other_df_name: str, + received_ID_col: bool, ) -> dict: # Not sure about the naming of this function """Create a dict for events or regressors according to input df. @@ -1417,12 +1404,10 @@ def create_dict_for_events_or_regressors( if other_df is None: # if other_df is None, create dictionary with None for each ID return {df_name: None for df_name in df_names} - other_df = other_df.copy(deep=True) - other_df, received_ID_col, _, _ = check_multiple_series_id(other_df) # if other_df does not contain ID, create dictionary with original ID with the same other_df for each ID if not received_ID_col: other_df = other_df.drop("ID", axis=1) - return {df_name: other_df.copy(deep=True) for df_name in df_names} + return {df_name: other_df for df_name in df_names} # else, other_df does contain ID, create dict with respective IDs df_unique_names, other_df_unique_names = list(df["ID"].unique()), list(other_df["ID"].unique()) @@ -1438,7 +1423,7 @@ def create_dict_for_events_or_regressors( df_other_dict = {} for df_name in df_unique_names: if df_name in other_df_unique_names: - df_aux = other_df[other_df["ID"] == df_name].reset_index(drop=True).copy(deep=True) + df_aux = other_df[other_df["ID"] == df_name].reset_index(drop=True) df_aux.drop("ID", axis=1, inplace=True) else: df_aux = None diff --git a/neuralprophet/forecaster.py b/neuralprophet/forecaster.py index e95a4bcaa..429b28baf 100644 --- a/neuralprophet/forecaster.py +++ b/neuralprophet/forecaster.py @@ -1195,7 +1195,7 @@ def fit( # Configure auto-seasoanlities and country-holidays if not self.fitted: # Temporarily merge df - df_merged = df_utils.merge_dataframes(df) + df_merged = df_utils.merge_dataframes(df.copy(deep=True)) self.config_seasonality = utils.set_auto_seasonalities( df_merged, config_seasonality=self.config_seasonality ) @@ -1245,8 +1245,6 @@ def fit( config_seasonality=self.config_seasonality, predicting=False, ) - # df_val = df_val.copy(deep=True) - # df_val, _, _, _ = df_utils.check_multiple_series_id(df_val) df_val = _normalize(df=df_val, config_normalization=self.config_normalization) val_components_stacker = utils_time_dataset.ComponentStacker( n_lags=self.config_ar.n_lags, @@ -1926,7 +1924,11 @@ def create_df_with_events(self, df: pd.DataFrame, events_df: pd.DataFrame): df = df.copy(deep=True) df, received_ID_col, received_single_time_series, _ = df_utils.check_multiple_series_id(df) df = _check_dataframe(self, df, check_y=True, exogenous=False) - df_dict_events = df_utils.create_dict_for_events_or_regressors(df, events_df, "events") + events_df = events_df.copy(deep=True) + events_df, events_df_received_ID_col, _, _ = df_utils.check_multiple_series_id(events_df) + df_dict_events = df_utils.create_dict_for_events_or_regressors( + df, events_df, "events", events_df_received_ID_col + ) df_created = pd.DataFrame() for df_name, df_i in df.groupby("ID"): for name in df_dict_events[df_name]["event"].unique(): @@ -2002,16 +2004,27 @@ def make_future_dataframe( """ df = df.copy(deep=True) df, received_ID_col, received_single_time_series, _ = df_utils.check_multiple_series_id(df) - events_dict = df_utils.create_dict_for_events_or_regressors(df, events_df, "events") - regressors_dict = df_utils.create_dict_for_events_or_regressors(df, regressors_df, "regressors") + + if events_df is not None: + events_df = events_df.copy(deep=True) + events_df, events_df_received_ID_col, _, _ = df_utils.check_multiple_series_id(events_df) + events_dict = df_utils.create_dict_for_events_or_regressors( + df, events_df, "events", events_df_received_ID_col + ) + if regressors_df is not None: + regressors_df = regressors_df.copy(deep=True) + regressors_df, regressors_df_received_ID_col, _, _ = df_utils.check_multiple_series_id(regressors_df) + regressors_dict = df_utils.create_dict_for_events_or_regressors( + df, regressors_df, "regressors", regressors_df_received_ID_col + ) df_future_dataframe = pd.DataFrame() for df_name, df_i in df.groupby("ID"): df_aux = _make_future_dataframe( model=self, df=df_i, - events_df=events_dict[df_name], - regressors_df=regressors_dict[df_name], + events_df=events_dict[df_name] if events_df is not None else None, + regressors_df=regressors_dict[df_name] if regressors_df is not None else None, periods=periods, n_historic_predictions=n_historic_predictions, n_forecasts=self.config_model.n_forecasts, @@ -2319,7 +2332,7 @@ def plot( plotted." ) else: - fcst = fcst[fcst["ID"] == df_name].copy(deep=True) + fcst = fcst[fcst["ID"] == df_name] log.info(f"Plotting data from ID {df_name}") if forecast_in_focus is None: forecast_in_focus = self.highlight_forecast_step_n @@ -2435,7 +2448,7 @@ def get_latest_forecast( "forecasted. " ) else: - fcst = fcst[fcst["ID"] == df_name].copy(deep=True) + fcst = fcst[fcst["ID"] == df_name] log.info(f"Getting data from ID {df_name}") if include_history_data is None: fcst = fcst[-(include_previous_forecasts + self.config_model.n_forecasts + self.config_model.max_lags) :] @@ -2512,7 +2525,7 @@ def plot_latest_forecast( "Please, especify ID to be plotted." ) else: - fcst = fcst[fcst["ID"] == df_name].copy(deep=True) + fcst = fcst[fcst["ID"] == df_name] log.info(f"Plotting data from ID {df_name}") if len(self.config_model.quantiles) > 1: log.warning( @@ -2649,7 +2662,7 @@ def plot_components( "Please, especify ID to be plotted." ) else: - fcst = fcst[fcst["ID"] == df_name].copy(deep=True) + fcst = fcst[fcst["ID"] == df_name] log.info(f"Plotting data from ID {df_name}") else: if df_name is None: diff --git a/tests/test_unit.py b/tests/test_unit.py index 7f42b1495..f115618f8 100644 --- a/tests/test_unit.py +++ b/tests/test_unit.py @@ -158,8 +158,8 @@ def test_normalize(): # using config for utils df = df.drop("ID", axis=1) - df_utils.normalize(df, m.config_normalization.global_data_params) - df_utils.normalize(df, m.config_normalization.local_data_params["__df__"]) + _ = df_utils.normalize(df.copy(deep=True), m.config_normalization.global_data_params) + _ = df_utils.normalize(df.copy(deep=True), m.config_normalization.local_data_params["__df__"]) def test_normalize_utils():