Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Skip watcher callbacks for FOVs that already have data generated #394

Merged
merged 17 commits into from
Jul 27, 2023
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 26 additions & 1 deletion src/toffy/watcher_callbacks.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import inspect
import os
import warnings
from dataclasses import dataclass, field
from typing import Iterable

Expand Down Expand Up @@ -179,6 +180,11 @@ def extract_tiffs(self, tiff_out_dir: str, panel: pd.DataFrame, **kwargs):
if not os.path.exists(tiff_out_dir):
os.makedirs(tiff_out_dir)

extracted_img_dir = os.path.join(tiff_out_dir, self.point_name)
if os.path.exists(extracted_img_dir):
warnings.warn(f"Images already extracted for FOV {self.point_name}")
return

alex-l-kong marked this conversation as resolved.
Show resolved Hide resolved
if self.__fov_data is None:
self._generate_fov_data(panel, **kwargs)

Expand Down Expand Up @@ -216,6 +222,15 @@ def generate_qc(self, qc_out_dir: str, panel: pd.DataFrame = None, **kwargs):
raise ValueError("Must provide panel if fov data is not already generated...")
self._generate_fov_data(panel, **kwargs)

qc_metric_paths = [
os.path.join(qc_out_dir, self.point_name + "_nonzero_mean_stats.csv"),
os.path.join(qc_out_dir, self.point_name + "_total_intensity_stats.csv"),
os.path.join(qc_out_dir, self.point_name + "_percentile_99_9_stats.csv"),
]
if all([os.path.exists(qc_file) for qc_file in qc_metric_paths]):
warnings.warn(f"All QC metrics already extracted for FOV {self.point_name}")
return

metric_data = compute_qc_metrics_direct(
image_data=self.__fov_data,
fov_name=self.point_name,
Expand All @@ -232,7 +247,7 @@ def generate_mph(self, mph_out_dir, **kwargs):
Args:
mph_out_dir (str): where to output mph csvs to
**kwargs (dict):
Additional arguments for `toffy.qc_comp.compute_mph_metrics`. Accepted kwargs are:
Additional arguments for `toffy.mph_comp.compute_mph_metrics`. Accepted kwargs are:

- mass
- mass_start
Expand All @@ -242,6 +257,11 @@ def generate_mph(self, mph_out_dir, **kwargs):
if not os.path.exists(mph_out_dir):
os.makedirs(mph_out_dir)

mph_pulse_file = os.path.join(mph_out_dir, f"{self.point_name}" + "-mph_pulse.csv")
alex-l-kong marked this conversation as resolved.
Show resolved Hide resolved
if os.path.exists(mph_pulse_file):
warnings.warn(f"MPH pulse metrics already extracted for FOV {self.point_name}")
return

compute_mph_metrics(
bin_file_dir=self.run_folder,
csv_dir=mph_out_dir,
Expand All @@ -267,6 +287,11 @@ def generate_pulse_heights(self, pulse_out_dir: str, panel: pd.DataFrame = None,
if not os.path.exists(pulse_out_dir):
os.makedirs(pulse_out_dir)

pulse_height_file = os.path.join(pulse_out_dir, f"{self.point_name}" + "-pulse_heights.csv")
alex-l-kong marked this conversation as resolved.
Show resolved Hide resolved
if os.path.exists(pulse_height_file):
warnings.warn(f"Pulse heights per mass already extracted for FOV {self.point_name}")
return

write_mph_per_mass(
base_dir=self.run_folder,
output_dir=pulse_out_dir,
Expand Down
63 changes: 51 additions & 12 deletions tests/fov_watcher_test.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
import os
import platform
import shutil
import subprocess
import tempfile
import time
import warnings
Expand All @@ -10,15 +8,20 @@
from pathlib import Path
from unittest.mock import patch

import numpy as np
import pandas as pd
import pytest
from alpineer import io_utils
from pytest_cases import parametrize_with_cases
from skimage.io import imsave

from toffy.fov_watcher import start_watcher
from toffy.json_utils import write_json_file
from toffy.settings import QC_COLUMNS, QC_SUFFIXES
from toffy.watcher_callbacks import build_callbacks

from .utils.test_utils import (
TEST_CHANNELS,
RunStructureCases,
RunStructureTestContext,
WatcherCases,
Expand Down Expand Up @@ -128,8 +131,9 @@ def test_run_structure(run_json, expected_files, recwarn):
@patch("toffy.watcher_callbacks.visualize_mph", side_effect=mock_visualize_mph)
@pytest.mark.parametrize("add_blank", [False, True])
@pytest.mark.parametrize("temp_bin", [False, True])
@pytest.mark.parametrize("watcher_start_lag", [4, 8, 12])
@parametrize_with_cases("run_cbs, int_cbs, fov_cbs, kwargs, validators", cases=WatcherCases)
@parametrize_with_cases(
"run_cbs,int_cbs,fov_cbs,kwargs,validators,watcher_start_lag,existing_data", cases=WatcherCases
)
def test_watcher(
mock_viz_qc,
mock_viz_mph,
Expand All @@ -138,9 +142,10 @@ def test_watcher(
fov_cbs,
kwargs,
validators,
watcher_start_lag,
existing_data,
add_blank,
temp_bin,
watcher_start_lag,
):
try:
with tempfile.TemporaryDirectory() as tmpdir:
Expand Down Expand Up @@ -172,24 +177,58 @@ def test_watcher(
json_object=COMBINED_RUN_JSON_SPOOF,
)

# if existing_data set to True, test case where a FOV has already been extracted
if existing_data:
os.makedirs(os.path.join(tiff_out_dir, "fov-2-scan-1"))
for channel in TEST_CHANNELS:
random_img = np.random.rand(32, 32)
imsave(
os.path.join(tiff_out_dir, "fov-2-scan-1", f"{channel}.tiff"), random_img
)

os.makedirs(qc_out_dir)
for qcs, qcc in zip(QC_SUFFIXES, QC_COLUMNS):
df_qc = pd.DataFrame(
np.random.rand(len(TEST_CHANNELS), 3), columns=["fov", "channel", qcc]
)
df_qc["fov"] = "fov-2-scan-1"
df_qc["channel"] = TEST_CHANNELS
df_qc.to_csv(os.path.join(qc_out_dir, f"fov-2-scan-1_{qcs}.csv"), index=False)

os.makedirs(mph_out_dir)
df_mph = pd.DataFrame(
np.random.rand(1, 4), columns=["fov", "MPH", "total_count", "time"]
)
df_mph["fov"] = "fov-2-scan-1"
df_mph.to_csv(os.path.join(mph_out_dir, "fov-2-scan-1-mph_pulse.csv"), index=False)

os.makedirs(pulse_out_dir)
df_ph = pd.DataFrame(np.random.rand(10, 3), columns=["mass", "fov", "pulse_height"])
df_ph["fov"] = "fov-2-scan-1"
df_ph.to_csv(
os.path.join(pulse_out_dir, "fov-2-scan-1-pulse_heights.csv"), index=False
)

# `_slow_copy_sample_tissue_data` mimics the instrument computer uploading data to the
# client access computer. `start_watcher` is made async here since these processes
# wouldn't block each other in normal use

with Pool(processes=4) as pool:
pool.apply_async(
_slow_copy_sample_tissue_data,
(run_data, SLOW_COPY_INTERVAL_S, add_blank, temp_bin),
)

time.sleep(watcher_start_lag)

# watcher completion is checked every second
# zero-size files are halted for 1 second or until they have non zero-size
watcher_warnings = []
if not add_blank:
with pytest.warns(
UserWarning, match="Re-extracting incompletely extracted FOV fov-1-scan-1"
):
watcher_warnings.append(
r"Re-extracting incompletely extracted FOV fov-1-scan-1"
)
if existing_data:
watcher_warnings.append(r"already extracted for FOV fov-2-scan-1")

if len(watcher_warnings) > 0:
with pytest.warns(UserWarning, match="|".join(watcher_warnings)):
res_scan = pool.apply_async(
start_watcher,
(
Expand Down
58 changes: 56 additions & 2 deletions tests/utils/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,48 @@
from toffy.json_utils import write_json_file
from toffy.settings import QC_COLUMNS, QC_SUFFIXES

TEST_CHANNELS = [
"Calprotectin",
"Chymase",
"SMA",
"Vimentin",
"LAG3",
"CD4",
"CD69",
"FAP",
"FOXP3",
"PD1",
"CD31",
"Biotin",
"Ecadherin",
"CD56",
"CD38",
"TCF1 TCF7",
"TBET",
"CD45RB",
"CD68",
"CD11c",
"CD8",
"CD3e",
"IDO1",
"CD45RO",
"TIM-3",
"CD163",
"CD20",
"FN1",
"Glut1",
"HLADR",
"CD14",
"CD45",
"Cytokeratin17",
"COL1A1",
"H3K27me3",
"CD57",
"H3K9ac",
"Ki67",
"HLA1 class ABC",
]


def make_run_file(tmp_dir, prefixes=[], include_nontiled=False):
"""Create a run subir and run json in the provided dir and return the path to this new dir."""
Expand Down Expand Up @@ -521,13 +563,25 @@ def case_default(self, intensity, replace):
["extract_tiffs", "generate_pulse_heights"],
kwargs,
validators,
1,
False,
)

@parametrize(intensity=(False, True))
@parametrize(replace=(False, True))
def case_inter_callback(self, intensity, replace):
rcs, _, fcs, kwargs, validators = self.case_default(intensity, replace)
rcs, _, fcs, kwargs, validators, wsl, ed = self.case_default(intensity, replace)
ics = rcs[:2]
rcs = rcs[2:]

return (rcs, ics, fcs, kwargs, validators)
return (rcs, ics, fcs, kwargs, validators, wsl, ed)

@parametrize(watcher_start_lag=(4, 8, 12))
def case_watcher_lag(self, watcher_start_lag):
rcs, ics, fcs, kwargs, validators, _, ed = self.case_default(True, True)
return (rcs, ics, fcs, kwargs, validators, watcher_start_lag, ed)

@parametrize(existing_data=(True, False))
def case_existing_data(self, existing_data):
rcs, ics, fcs, kwargs, validators, wsl, _ = self.case_default(False, False)
return (rcs, ics, fcs, kwargs, validators, wsl, existing_data)
Loading