Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Calibrate relevant subset in dynamic lock #317

Merged
merged 15 commits into from
Aug 22, 2024
6 changes: 4 additions & 2 deletions alphadia/outputtransform.py
Original file line number Diff line number Diff line change
Expand Up @@ -965,12 +965,14 @@ def _build_run_internal_df(
)
raw_name = os.path.basename(folder_path)

internal_dict = {}
internal_dict = {
"run": raw_name,
}

if os.path.exists(timing_manager_path):
timing_manager = manager.TimingManager(path=timing_manager_path)
for key in timing_manager.timings:
internal_dict[f"{key}_duration"] = [timing_manager.timings[key]["duration"]]
internal_dict[f"duration_{key}"] = [timing_manager.timings[key]["duration"]]

else:
logger.warning(f"Error reading timing manager for {raw_name}")
Expand Down
54 changes: 43 additions & 11 deletions alphadia/workflow/optimization.py
Original file line number Diff line number Diff line change
Expand Up @@ -397,7 +397,7 @@ def _propose_new_parameter(self, df: pd.DataFrame):
"""
return 1.1 * self.workflow.calibration_manager.get_estimator(
self.estimator_group_name, self.estimator_name
).ci(df, 0.975)
).ci(df, 0.99)

def _get_feature_value(
self, precursors_df: pd.DataFrame, fragments_df: pd.DataFrame
Expand Down Expand Up @@ -434,7 +434,7 @@ def _propose_new_parameter(self, df: pd.DataFrame):
"""
return 1.1 * self.workflow.calibration_manager.get_estimator(
self.estimator_group_name, self.estimator_name
).ci(df, 0.975)
).ci(df, 0.99)

def _get_feature_value(
self, precursors_df: pd.DataFrame, fragments_df: pd.DataFrame
Expand Down Expand Up @@ -472,7 +472,7 @@ def _propose_new_parameter(self, df: pd.DataFrame):
"""
return 1.1 * self.workflow.calibration_manager.get_estimator(
self.estimator_group_name, self.estimator_name
).ci(df, 0.975)
).ci(df, 0.99)

def _get_feature_value(
self, precursors_df: pd.DataFrame, fragments_df: pd.DataFrame
Expand Down Expand Up @@ -510,7 +510,7 @@ def _propose_new_parameter(self, df: pd.DataFrame):

return 1.1 * self.workflow.calibration_manager.get_estimator(
self.estimator_group_name, self.estimator_name
).ci(df, 0.975)
).ci(df, 0.99)
odespard marked this conversation as resolved.
Show resolved Hide resolved

def _get_feature_value(
self, precursors_df: pd.DataFrame, fragments_df: pd.DataFrame
Expand Down Expand Up @@ -593,7 +593,7 @@ def __init__(self, library, config: dict):
self._library = library
self._config = config

self.first_time_to_reach_target = True
self.previously_calibrated = False
self.has_target_num_precursors = False

self.elution_group_order = library._precursor_df["elution_group_idx"].unique()
Expand All @@ -606,9 +606,7 @@ def __init__(self, library, config: dict):
self.set_batch_plan()

eg_idxes = self.elution_group_order[self.start_idx : self.stop_idx]
self.batch_precursor_df = self._library._precursor_df[
self._library._precursor_df["elution_group_idx"].isin(eg_idxes)
]
self.set_batch_dfs(eg_idxes)

self.feature_dfs = []
self.fragment_dfs = []
Expand Down Expand Up @@ -654,7 +652,7 @@ def update_with_extraction(self, feature_df, fragment_df):
self.feature_dfs += [feature_df]
self.fragment_dfs += [fragment_df]

self.total_precursors = len(self.features_df)
self.total_precursors = self.features_df.precursor_idx.nunique()
odespard marked this conversation as resolved.
Show resolved Hide resolved

def update_with_fdr(self, precursor_df):
"""Calculates the number of precursors at 1% FDR for the current optimization lock and determines if it is sufficient to perform calibration and optimization.
Expand Down Expand Up @@ -704,7 +702,6 @@ def update(self):
] # Take only additional elution groups in the new step of the batch plan. The target has not been reached and hence recalibration and optimization have not been performed, so previous extractions can be re-used.

else:
self.first_time_to_reach_target = False
self.decrease_batch_idx()
eg_idxes = self.elution_group_order[
: self.stop_idx
Expand All @@ -713,9 +710,44 @@ def update(self):
self.fragment_dfs = []

# Sets the batch to be used in the next round of optimization. Note that this batch will only include additional precursors if the target was not reached in this step (as the precursors from this step are stored in the feature_dfs attribute).
odespard marked this conversation as resolved.
Show resolved Hide resolved
self.set_batch_dfs(eg_idxes)

def set_batch_dfs(self, eg_idxes):
self.batch_precursor_df = self._library._precursor_df[
self._library._precursor_df["elution_group_idx"].isin(eg_idxes)
]
].copy()

start_indices = self.batch_precursor_df["flat_frag_start_idx"].values
stop_indices = self.batch_precursor_df["flat_frag_stop_idx"].values

# Create ranges for each row and concatenate them
fragment_idxes = np.concatenate(
[
np.arange(start, stop)
for start, stop in zip(start_indices, stop_indices, strict=True)
]
)

# Extract the fragments for the optimization lock and reset the indices to a consecutive range of positive integers. This simplifies future access based on position
self.batch_fragment_df = self._library._fragment_df.iloc[fragment_idxes].copy()

self.reindex_fragments()

def reindex_fragments(self):
"""
Reindex the fragment dataframe to have a consecutive range of positive integers as indices. Change the precursor dataframe such that the fragment indices match the reindexed fragment dataframe.
"""
self.batch_fragment_df.reset_index(drop=True, inplace=True)

# Change the fragment indices in batch_precursor_df to match the fragment indices in batch_fragment_df instead of the full spectral library.
num_frags = (
self.batch_precursor_df["flat_frag_stop_idx"]
- self.batch_precursor_df["flat_frag_start_idx"]
)
self.batch_precursor_df["flat_frag_stop_idx"] = num_frags.cumsum()
self.batch_precursor_df["flat_frag_start_idx"] = self.batch_precursor_df[
"flat_frag_stop_idx"
].shift(fill_value=0)

@property
def features_df(self):
Expand Down
73 changes: 27 additions & 46 deletions alphadia/workflow/peptidecentric.py
Original file line number Diff line number Diff line change
Expand Up @@ -352,32 +352,6 @@ def get_ordered_optimizers(self):

return ordered_optimizers

def first_recalibration(
self,
precursor_df: pd.DataFrame,
fragments_df: pd.DataFrame,
):
"""Performs the first recalibration and optimization step.

Parameters
----------
precursor_df : pd.DataFrame
Precursor dataframe from optimization lock

fragments_df : pd.DataFrame
Fragment dataframe from optimization lock

"""

self.optimization_manager.fit(
{"classifier_version": self.fdr_manager.current_version}
)
precursor_df_filtered, fragments_df_filtered = self.filter_dfs(
precursor_df, fragments_df
)

self.recalibration(precursor_df_filtered, fragments_df_filtered)

def calibration(self):
"""Performs optimization of the search parameters. This occurs in two stages:
1) Optimization lock: the data are searched to acquire a locked set of precursors which is used for search parameter optimization. The classifier is also trained during this stage.
Expand Down Expand Up @@ -428,7 +402,7 @@ def calibration(self):
)

feature_df, fragment_df = self.extract_batch(
self.optlock.batch_precursor_df
self.optlock.batch_precursor_df, self.optlock.batch_fragment_df
)
self.optlock.update_with_extraction(feature_df, fragment_df)

Expand All @@ -453,23 +427,25 @@ def calibration(self):
self.log_precursor_df(precursor_df)

if self.optlock.has_target_num_precursors:
if self.optlock.first_time_to_reach_target: # Recalibrates and updates classifier, but does not optimize the first time the target is reached. Optimization is more stable when done with calibrated values.
self.first_recalibration(
precursor_df, self.optlock.fragments_df
precursor_df_filtered, fragments_df_filtered = self.filter_dfs(
precursor_df, self.optlock.fragments_df
)

self.optlock.update()

self.recalibration(precursor_df_filtered, fragments_df_filtered)

if not self.optlock.previously_calibrated: # Updates classifier but does not optimize the first time the target is reached. Optimization is more stable when done with calibrated values.
self.optlock.previously_calibrated = True
self.optimization_manager.fit(
{"classifier_version": self.fdr_manager.current_version}
)
self.optlock.update()
self.reporter.log_string(
"Required number of precursors found. Starting search parameter optimization.",
verbosity="progress",
)
continue

precursor_df_filtered, fragments_df_filtered = self.filter_dfs(
precursor_df, self.optlock.fragments_df
)

self.recalibration(precursor_df_filtered, fragments_df_filtered)

self.reporter.log_string(
"=== checking if optimization conditions were reached ===",
verbosity="info",
odespard marked this conversation as resolved.
Show resolved Hide resolved
Expand All @@ -487,9 +463,10 @@ def calibration(self):
f"=== Optimization has been performed {optimizer.num_prev_optimizations} times; minimum number is {self.config['calibration']['min_steps']} ===",
verbosity="progress",
)

self.optlock.update()

else:
odespard marked this conversation as resolved.
Show resolved Hide resolved
self.optlock.update()
if self.optlock.previously_calibrated:
self.apply_calibration()
else:
odespard marked this conversation as resolved.
Show resolved Hide resolved
self.reporter.log_string(
"Optimization did not converge within the maximum number of steps, which is {self.config['calibration']['max_steps']}.",
Expand Down Expand Up @@ -594,12 +571,7 @@ def recalibration(self, precursor_df_filtered, fragments_df_filtered):
# neptune_run = self.neptune
)

self.calibration_manager.predict(
self.spectral_library._precursor_df,
"precursor",
)

self.calibration_manager.predict(self.spectral_library._fragment_df, "fragment")
self.apply_calibration()

self.optimization_manager.fit(
{
Expand All @@ -617,6 +589,15 @@ def recalibration(self, precursor_df_filtered, fragments_df_filtered):
}
)

def apply_calibration(self):
"""Predicts values in the optimization lock batch."""
odespard marked this conversation as resolved.
Show resolved Hide resolved
self.calibration_manager.predict(
self.optlock.batch_precursor_df,
"precursor",
)

self.calibration_manager.predict(self.optlock.batch_fragment_df, "fragment")

def fdr_correction(self, features_df, df_fragments, version=-1):
return self.fdr_manager.fit_predict(
features_df,
Expand Down
2 changes: 1 addition & 1 deletion tests/unit_tests/test_outputtransform.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ def test_output_transform():
internal_df = pd.read_csv(
os.path.join(temp_folder, f"{output.INTERNAL_OUTPUT}.tsv"), sep="\t"
)
assert isinstance(internal_df["extraction_duration"][0], float)
assert isinstance(internal_df["duration_extraction"][0], float)
# validate protein_df output
protein_df = pd.read_parquet(os.path.join(temp_folder, "pg.matrix.parquet"))
assert all([col in protein_df.columns for col in ["run_0", "run_1", "run_2"]])
Expand Down
88 changes: 75 additions & 13 deletions tests/unit_tests/test_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -781,13 +781,48 @@ def test_targeted_mobility_optimizer():

class TEST_LIBRARY:
def __init__(self):
idxes = np.arange(0, 100000)
self._precursor_df = pd.DataFrame({"elution_group_idx": idxes})
self._fragments_df = pd.DataFrame(
precursor_idx = np.arange(100000)
elution_group_idx = np.concatenate(
[np.full(2, i, dtype=int) for i in np.arange(len(precursor_idx) / 2)]
)
flat_frag_start_idx = precursor_idx * 10
flat_frag_stop_idx = (precursor_idx + 1) * 10
self._precursor_df = pd.DataFrame(
{
"elution_group_idx": elution_group_idx,
"precursor_idx": precursor_idx,
"flat_frag_start_idx": flat_frag_start_idx,
"flat_frag_stop_idx": flat_frag_stop_idx,
}
)

self._fragment_df = pd.DataFrame(
{
"precursor_idx": np.arange(0, flat_frag_stop_idx[-1]),
}
)


class TEST_LIBRARY_INDEX:
def __init__(self):
precursor_idx = np.arange(1000)
elution_group_idx = np.concatenate(
[np.full(2, i, dtype=int) for i in np.arange(len(precursor_idx) / 2)]
)
flat_frag_start_idx = precursor_idx**2
flat_frag_stop_idx = (precursor_idx + 1) ** 2
self._precursor_df = pd.DataFrame(
{
"elution_group_idx": elution_group_idx,
"precursor_idx": precursor_idx,
"flat_frag_start_idx": flat_frag_start_idx,
"flat_frag_stop_idx": flat_frag_stop_idx,
}
)

self._fragment_df = pd.DataFrame(
{
"precursor_idx": np.concatenate(
[np.full(10, precursor_idx) for precursor_idx in idxes]
)
"precursor_idx": np.arange(0, flat_frag_stop_idx[-1]),
}
)

Expand All @@ -806,8 +841,8 @@ def test_optlock():

assert optlock.start_idx == optlock.batch_plan[0][0]

feature_df = pd.DataFrame({"col": np.arange(0, 1000)})
fragment_df = pd.DataFrame({"col": np.arange(0, 10000)})
feature_df = pd.DataFrame({"precursor_idx": np.arange(0, 1000)})
fragment_df = pd.DataFrame({"precursor_idx": np.arange(0, 10000)})

optlock.update_with_extraction(feature_df, fragment_df)

Expand All @@ -818,13 +853,13 @@ def test_optlock():
optlock.update_with_fdr(precursor_df)

assert optlock.has_target_num_precursors is False
assert optlock.first_time_to_reach_target is True
assert optlock.previously_calibrated is False
optlock.update()

assert optlock.start_idx == optlock.batch_plan[1][0]

feature_df = pd.DataFrame({"col": np.arange(0, 1000)})
fragment_df = pd.DataFrame({"col": np.arange(0, 10000)})
feature_df = pd.DataFrame({"precursor_idx": np.arange(1000, 2000)})
fragment_df = pd.DataFrame({"precursor_idx": np.arange(10000, 20000)})

optlock.update_with_extraction(feature_df, fragment_df)

Expand All @@ -837,10 +872,10 @@ def test_optlock():
optlock.update_with_fdr(precursor_df)

assert optlock.has_target_num_precursors is True
assert optlock.first_time_to_reach_target is True
assert optlock.previously_calibrated is False

optlock.update()

assert optlock.first_time_to_reach_target is False
assert optlock.start_idx == 0


Expand All @@ -866,3 +901,30 @@ def test_optlock_batch_idx():

assert optlock.start_idx == 0
assert optlock.stop_idx == 2000


def test_optlock_reindex():
library = TEST_LIBRARY_INDEX()
optlock = optimization.OptimizationLock(library, TEST_OPTLOCK_CONFIG)
optlock.batch_plan = [[0, 100], [100, 200]]
optlock.set_batch_dfs(
optlock.elution_group_order[optlock.start_idx : optlock.stop_idx]
)
optlock.reindex_fragments()

assert (
(
optlock.batch_precursor_df["flat_frag_stop_idx"].iloc[100]
- optlock.batch_precursor_df["flat_frag_start_idx"].iloc[100]
)
== (
(optlock.batch_precursor_df["precursor_idx"].iloc[100] + 1) ** 2
- optlock.batch_precursor_df["precursor_idx"].iloc[100] ** 2
)
) # Since each precursor was set (based on its original ID) to have a number of fragments equal to its original ID squared, the difference between the start and stop index should be equal to the original ID squared (even if the start and stop index have been changed to different values)
assert (
optlock.batch_fragment_df.iloc[
optlock.batch_precursor_df.iloc[50]["flat_frag_start_idx"]
]["precursor_idx"]
== optlock.batch_precursor_df.iloc[50]["precursor_idx"] ** 2
) # The original start index of any precursor should be equal to the square of the its original ID
Loading