Skip to content

Commit

Permalink
[Minor] Reduce DataFrame copies (#1648)
Browse files Browse the repository at this point in the history
* don't copy return df

* explicity copy df before calling merge

* remove copy from normalize

* fix test

* remove cv fold copy

* do not drop ID when normalizing

* do not copy when init data params

* cleanup

* remove copy from global cv intersect

* remove copy from create_dict_for_events_or_regressors

* handle None events regressors

* fix regressors

* remove copy from _normalize

* remove copy from _make_future_dataframe

* remove copy from _prepare_dataframe_to_predict

* remove double copy in plotting func

* fix typo

* remove split deepcopy

* retain comments of former copies in split
  • Loading branch information
ourownstory authored Sep 13, 2024
1 parent 4904808 commit 5e6b231
Show file tree
Hide file tree
Showing 6 changed files with 60 additions and 71 deletions.
8 changes: 1 addition & 7 deletions neuralprophet/data/process.py
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,6 @@ def _prepare_dataframe_to_predict(model, df: pd.DataFrame, max_lags: int, freq:
# Receives df with ID column
df_prepared = pd.DataFrame()
for df_name, df_i in df.groupby("ID"):
df_i = df_i.copy(deep=True)
_ = df_utils.infer_frequency(df_i, n_lags=max_lags, freq=freq)
# check if received pre-processed df
if "y_scaled" in df_i.columns or "t" in df_i.columns:
Expand Down Expand Up @@ -283,7 +282,7 @@ def _prepare_dataframe_to_predict(model, df: pd.DataFrame, max_lags: int, freq:
config_seasonality=model.config_seasonality,
predicting=True,
)
df_prepared = pd.concat((df_prepared, df_i.copy(deep=True).reset_index(drop=True)), ignore_index=True)
df_prepared = pd.concat((df_prepared, df_i.reset_index(drop=True)), ignore_index=True)
return df_prepared


Expand Down Expand Up @@ -399,8 +398,6 @@ def _check_dataframe(
"Dataframe has less than n_forecasts + n_lags rows. "
"Forecasting not possible. Please either use a larger dataset, or adjust the model parameters."
)
# df = df.copy(deep=True)
# df, _, _, _ = df_utils.check_multiple_series_id(df)
df, regressors_to_remove, lag_regressors_to_remove = df_utils.check_dataframe(
df=df,
check_y=check_y,
Expand Down Expand Up @@ -475,9 +472,6 @@ def _handle_missing_data(
The pre-processed DataFrame, including imputed missing data, if applicable.
"""
# df = df.copy(deep=True)
# df, _, _, _ = df_utils.check_multiple_series_id(df)

if n_lags == 0 and not predicting:
# drop rows with NaNs in y and count them
df_na_dropped = df.dropna(subset=["y"])
Expand Down
11 changes: 5 additions & 6 deletions neuralprophet/data/split.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ def _maybe_extend_df(
future_df["ID"] = df_name
df_i = pd.concat([df_i, future_df])
df_i.reset_index(drop=True, inplace=True)
extended_df = pd.concat((extended_df, df_i.copy(deep=True)), ignore_index=True)
extended_df = pd.concat((extended_df, df_i), ignore_index=True)
return extended_df, periods_add


Expand Down Expand Up @@ -126,8 +126,8 @@ def _get_maybe_extend_periods(
def _make_future_dataframe(
model,
df: pd.DataFrame,
events_df: pd.DataFrame,
regressors_df: pd.DataFrame,
events_df: Optional[pd.DataFrame],
regressors_df: Optional[pd.DataFrame],
periods: Optional[int],
n_historic_predictions: int,
n_forecasts: int,
Expand Down Expand Up @@ -174,13 +174,12 @@ def _make_future_dataframe(
log.warning(
"Not extending df into future as no periods specified. You can skip this and predict directly instead."
)
df = df.copy(deep=True)
_ = df_utils.infer_frequency(df, n_lags=max_lags, freq=freq)
last_date = pd.to_datetime(df["ds"].copy(deep=True).dropna()).sort_values().max()
if events_df is not None:
events_df = events_df.copy(deep=True).reset_index(drop=True)
events_df = events_df.reset_index(drop=True)
if regressors_df is not None:
regressors_df = regressors_df.copy(deep=True).reset_index(drop=True)
regressors_df = regressors_df.reset_index(drop=True)
if periods is None:
periods = 1 if max_lags == 0 else n_forecasts
else:
Expand Down
4 changes: 1 addition & 3 deletions neuralprophet/data/transform.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,11 @@ def _normalize(df: pd.DataFrame, config_normalization: Normalization) -> pd.Data
-------
df: pd.DataFrame, normalized
"""
# df = df.copy(deep=True)
# df, _, _, _ = df_utils.check_multiple_series_id(df)
df_norm = pd.DataFrame()
for df_name, df_i in df.groupby("ID"):
data_params = config_normalization.get_data_params(df_name)
df_i.drop("ID", axis=1, inplace=True)
df_aux = df_utils.normalize(df_i, data_params).copy(deep=True)
df_aux = df_utils.normalize(df_i, data_params)
df_aux["ID"] = df_name
df_norm = pd.concat((df_norm, df_aux), ignore_index=True)
return df_norm
67 changes: 26 additions & 41 deletions neuralprophet/df_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,12 +77,11 @@ def return_df_in_original_format(df, received_ID_col=False, received_single_time
pd.Dataframe
original input format
"""
new_df = df.copy(deep=True)
if not received_ID_col and received_single_time_series:
assert len(new_df["ID"].unique()) == 1
new_df.drop("ID", axis=1, inplace=True)
assert len(df["ID"].unique()) == 1
df.drop("ID", axis=1, inplace=True)
log.info("Returning df with no ID column")
return new_df
return df


def merge_dataframes(df: pd.DataFrame) -> pd.DataFrame:
Expand All @@ -102,7 +101,7 @@ def merge_dataframes(df: pd.DataFrame) -> pd.DataFrame:
raise ValueError("Can not join other than pd.DataFrames")
if "ID" not in df.columns:
raise ValueError("df does not contain 'ID' column")
df_merged = df.copy(deep=True).drop("ID", axis=1)
df_merged = df.drop("ID", axis=1)
df_merged = df_merged.sort_values("ds")
df_merged = df_merged.drop_duplicates(subset=["ds"])
df_merged = df_merged.reset_index(drop=True)
Expand Down Expand Up @@ -282,11 +281,8 @@ def init_data_params(
ShiftScale entries containing ``shift`` and ``scale`` parameters for each column
"""
# Compute Global data params
# df = df.copy(deep=True)
# df, _, _, _ = check_multiple_series_id(df)
df_merged = df.copy(deep=True).drop("ID", axis=1)
global_data_params = data_params_definition(
df_merged, normalize, config_lagged_regressors, config_regressors, config_events, config_seasonality
df, normalize, config_lagged_regressors, config_regressors, config_events, config_seasonality
)
if global_normalization:
log.debug(
Expand All @@ -296,7 +292,6 @@ def init_data_params(
local_data_params = OrderedDict()
local_run_despite_global = True if global_normalization else None
for df_name, df_i in df.groupby("ID"):
df_i.drop("ID", axis=1, inplace=True)
local_data_params[df_name] = data_params_definition(
df=df_i,
normalize=normalize,
Expand Down Expand Up @@ -378,7 +373,6 @@ def normalize(df, data_params):
pd.DataFrame
normalized dataframes
"""
df = df.copy(deep=True)
for name in df.columns:
if name == "ID":
continue
Expand Down Expand Up @@ -428,8 +422,7 @@ def check_dataframe(
pd.DataFrame or dict
checked dataframe
"""
# df = df.copy(deep=True)
# df, _, _, _ = check_multiple_series_id(df)
# TODO: move call to check_multiple_series_id here
if df.groupby("ID").size().min() < 1:
raise ValueError("Dataframe has no rows.")
if "ds" not in df:
Expand Down Expand Up @@ -542,7 +535,7 @@ def _crossvalidation_split_df(df, n_lags, n_forecasts, k, fold_pct, fold_overlap
min_train = total_samples - samples_fold - (k - 1) * (samples_fold - samples_overlap)
assert min_train >= samples_fold
folds = []
df_fold = df.copy(deep=True)
df_fold = df
for i in range(k, 0, -1):
df_train, df_val = split_df(df_fold, n_lags, n_forecasts, valid_p=samples_fold, inputs_overbleed=True)
folds.append((df_train, df_val))
Expand Down Expand Up @@ -635,33 +628,30 @@ def _crossvalidation_with_time_threshold(df, n_lags, n_forecasts, k, fold_pct, f
validation data
"""
df_merged = merge_dataframes(df)
df_merged = merge_dataframes(df.copy(deep=True))
total_samples = len(df_merged) - n_lags + 2 - (2 * n_forecasts)
samples_fold = max(1, int(fold_pct * total_samples))
samples_overlap = int(fold_overlap_pct * samples_fold)
assert samples_overlap < samples_fold
min_train = total_samples - samples_fold - (k - 1) * (samples_fold - samples_overlap)
assert min_train >= samples_fold
folds = []
df_fold = df
# df_fold = df.copy(deep=True)
# df_fold, _, _, _ = check_multiple_series_id(df_fold)
for i in range(k, 0, -1):
threshold_time_stamp = find_time_threshold(df_fold, n_lags, n_forecasts, samples_fold, inputs_overbleed=True)
threshold_time_stamp = find_time_threshold(df, n_lags, n_forecasts, samples_fold, inputs_overbleed=True)
df_train, df_val = split_considering_timestamp(
df_fold, n_lags, n_forecasts, inputs_overbleed=True, threshold_time_stamp=threshold_time_stamp
df, n_lags, n_forecasts, inputs_overbleed=True, threshold_time_stamp=threshold_time_stamp
)
folds.append((df_train, df_val))
split_idx = len(df_merged) - samples_fold + samples_overlap
df_merged = df_merged[:split_idx].reset_index(drop=True)
threshold_time_stamp = df_merged["ds"].iloc[-1]
df_fold_aux = pd.DataFrame()
for df_name, df_i in df_fold.groupby("ID"):
df_aux = (
df_i.copy(deep=True).iloc[: len(df_i[df_i["ds"] < threshold_time_stamp]) + 1].reset_index(drop=True)
)
for df_name, df_i in df.groupby("ID"):
# df_i = df_i.copy(deep=True)
df_aux = df_i.iloc[: len(df_i[df_i["ds"] < threshold_time_stamp]) + 1].reset_index(drop=True)
df_fold_aux = pd.concat((df_fold_aux, df_aux), ignore_index=True)
df_fold = df_fold_aux.copy(deep=True)
df = df_fold_aux
# df = df.copy(deep=True)
folds = folds[::-1]
return folds

Expand Down Expand Up @@ -707,7 +697,6 @@ def crossvalidation_split_df(
validation data
"""
# df = df.copy(deep=True)
df, _, _, _ = check_multiple_series_id(df)
folds = []
if len(df["ID"].unique()) == 1:
Expand All @@ -733,7 +722,7 @@ def crossvalidation_split_df(
start_date, end_date = find_valid_time_interval_for_cv(df)
for df_name, df_i in df.groupby("ID"):
mask = (df_i["ds"] >= start_date) & (df_i["ds"] <= end_date)
df_i = df_i[mask].copy(deep=True)
df_i = df_i[mask]
folds_dict[df_name] = _crossvalidation_split_df(
df_i, n_lags, n_forecasts, k, fold_pct, fold_overlap_pct
)
Expand Down Expand Up @@ -768,8 +757,6 @@ def double_crossvalidation_split_df(df, n_lags, n_forecasts, k, valid_pct, test_
tuple of k tuples [(folds_val, folds_test), …]
elements same as :meth:`crossvalidation_split_df` returns
"""
# df = df.copy(deep=True)
# df, _, _, _ = check_multiple_series_id(df)
if len(df["ID"].unique()) > 1:
raise NotImplementedError("double_crossvalidation_split_df not implemented for df with many time series")
fold_pct_test = float(test_pct) / k
Expand Down Expand Up @@ -800,7 +787,7 @@ def find_time_threshold(df, n_lags, n_forecasts, valid_p, inputs_overbleed):
str
time stamp threshold defines the boundary for the train and validation sets split.
"""
df_merged = merge_dataframes(df)
df_merged = merge_dataframes(df.copy(deep=True))
n_samples = len(df_merged) - n_lags + 2 - (2 * n_forecasts)
n_samples = n_samples if inputs_overbleed else n_samples - n_lags
if 0.0 < valid_p < 1.0:
Expand Down Expand Up @@ -842,11 +829,14 @@ def split_considering_timestamp(df, n_lags, n_forecasts, inputs_overbleed, thres
df_val = pd.DataFrame()
for df_name, df_i in df.groupby("ID"):
if df[df["ID"] == df_name]["ds"].max() < threshold_time_stamp:
df_train = pd.concat((df_train, df_i.copy(deep=True)), ignore_index=True)
# df_i = df_i.copy(deep=True)
df_train = pd.concat((df_train, df_i), ignore_index=True)
elif df[df["ID"] == df_name]["ds"].min() > threshold_time_stamp:
df_val = pd.concat((df_val, df_i.copy(deep=True)), ignore_index=True)
# df_i = df_i.copy(deep=True)
df_val = pd.concat((df_val, df_i), ignore_index=True)
else:
df_aux = df_i.copy(deep=True)
df_aux = df_i
# df_i = df_i.copy(deep=True)
n_train = len(df_aux[df_aux["ds"] < threshold_time_stamp])
split_idx_train = n_train + n_lags + n_forecasts - 1
split_idx_val = split_idx_train - n_lags if inputs_overbleed else split_idx_train
Expand Down Expand Up @@ -890,8 +880,6 @@ def split_df(
pd.DataFrame, dict
validation data
"""
# df = df.copy(deep=True)
# df, _, _, _ = check_multiple_series_id(df)
df_train = pd.DataFrame()
df_val = pd.DataFrame()
if local_split:
Expand Down Expand Up @@ -1373,8 +1361,6 @@ def infer_frequency(df, freq, n_lags, min_freq_percentage=0.7):
Valid frequency tag according to major frequency.
"""
# df = df.copy(deep=True)
# df, _, _, _ = check_multiple_series_id(df)
freq_df = list()
for df_name, df_i in df.groupby("ID"):
freq_df.append(_infer_frequency(df_i, freq, min_freq_percentage))
Expand All @@ -1396,6 +1382,7 @@ def create_dict_for_events_or_regressors(
df: pd.DataFrame,
other_df: Optional[pd.DataFrame],
other_df_name: str,
received_ID_col: bool,
) -> dict: # Not sure about the naming of this function
"""Create a dict for events or regressors according to input df.
Expand All @@ -1417,12 +1404,10 @@ def create_dict_for_events_or_regressors(
if other_df is None:
# if other_df is None, create dictionary with None for each ID
return {df_name: None for df_name in df_names}
other_df = other_df.copy(deep=True)
other_df, received_ID_col, _, _ = check_multiple_series_id(other_df)
# if other_df does not contain ID, create dictionary with original ID with the same other_df for each ID
if not received_ID_col:
other_df = other_df.drop("ID", axis=1)
return {df_name: other_df.copy(deep=True) for df_name in df_names}
return {df_name: other_df for df_name in df_names}

# else, other_df does contain ID, create dict with respective IDs
df_unique_names, other_df_unique_names = list(df["ID"].unique()), list(other_df["ID"].unique())
Expand All @@ -1438,7 +1423,7 @@ def create_dict_for_events_or_regressors(
df_other_dict = {}
for df_name in df_unique_names:
if df_name in other_df_unique_names:
df_aux = other_df[other_df["ID"] == df_name].reset_index(drop=True).copy(deep=True)
df_aux = other_df[other_df["ID"] == df_name].reset_index(drop=True)
df_aux.drop("ID", axis=1, inplace=True)
else:
df_aux = None
Expand Down
Loading

0 comments on commit 5e6b231

Please sign in to comment.