diff --git a/src/nplinker/metabolomics/gnps/__init__.py b/src/nplinker/metabolomics/gnps/__init__.py index 35721a87..ac98b7d0 100644 --- a/src/nplinker/metabolomics/gnps/__init__.py +++ b/src/nplinker/metabolomics/gnps/__init__.py @@ -4,7 +4,6 @@ from .gnps_file_mapping_loader import GNPSFileMappingLoader from .gnps_format import GNPSFormat from .gnps_format import gnps_format_from_archive -from .gnps_format import gnps_format_from_file_mapping from .gnps_format import gnps_format_from_gnps1_task_id from .gnps_molecular_family_loader import GNPSMolecularFamilyLoader from .gnps_spectrum_loader import GNPSSpectrumLoader @@ -19,6 +18,5 @@ "GNPSMolecularFamilyLoader", "GNPSSpectrumLoader", "gnps_format_from_archive", - "gnps_format_from_file_mapping", "gnps_format_from_gnps1_task_id", ] diff --git a/src/nplinker/metabolomics/gnps/gnps_file_mapping_loader.py b/src/nplinker/metabolomics/gnps/gnps_file_mapping_loader.py index 3de045ab..c62e35fc 100644 --- a/src/nplinker/metabolomics/gnps/gnps_file_mapping_loader.py +++ b/src/nplinker/metabolomics/gnps/gnps_file_mapping_loader.py @@ -1,11 +1,11 @@ from __future__ import annotations import csv +import re from os import PathLike from pathlib import Path from nplinker.metabolomics.abc import FileMappingLoaderBase from nplinker.utils import is_file_format from .gnps_format import GNPSFormat -from .gnps_format import gnps_format_from_file_mapping class GNPSFileMappingLoader(FileMappingLoaderBase): @@ -23,11 +23,21 @@ class GNPSFileMappingLoader(FileMappingLoaderBase): 1. METABOLOMICS-SNETS - clusterinfosummarygroup_attributes_withIDs_withcomponentID/*.tsv 2. METABOLOMICS-SNETS-V2 - - clusterinfosummarygroup_attributes_withIDs_withcomponentID/*.clustersummary + - clusterinfosummarygroup_attributes_withIDs_withcomponentID/*.clustersummary (.tsv file) 3. FEATURE-BASED-MOLECULAR-NETWORKING - quantification_table*/*.csv + 4. GNPS2 classical_networking_workflow + - nf_output/clustering/featuretable_reformatted_presence.csv + 5. GNPS2 feature_based_molecular_networking_workflow + - nf_output/clustering/featuretable_reformated.csv + + + The `tsv` files from different workflows have different headers, while the `.csv` files from + different workflows have consistent headers. """ + _CSV_GNPSFormats = (GNPSFormat.FBMN, GNPSFormat.GNPS2CN, GNPSFormat.GNPS2FBMN) + def __init__(self, file: str | PathLike) -> None: """Initialize the GNPSFileMappingLoader. @@ -44,7 +54,7 @@ def __init__(self, file: str | PathLike) -> None: >>> print(loader.mapping_reversed["26c.mzXML"]) {'1', '3', '7', ...} """ - self._gnps_format = gnps_format_from_file_mapping(file) + self._gnps_format = self._detect_gnps_format(file) if self._gnps_format is GNPSFormat.Unknown: raise ValueError("Unknown workflow type for GNPS file mappings file ") @@ -80,6 +90,29 @@ def mapping_reversed(self) -> dict[str, set[str]]: return mapping_reversed + def _detect_gnps_format(self, file: str | PathLike) -> GNPSFormat | tuple[GNPSFormat, ...]: + """Detect GNPS format(s) from the given file mapping file. + + The `tsv` files from different workflows have different headers, while the `.csv` files from + different workflows have consistent headers. + + Args: + file: Path to the file to peek the format for. + + Returns: + GNPS format(s) identified in the file. + """ + with open(file, "r") as f: + header = f.readline().strip() + + if re.search(r"\bAllFiles\b", header): + return GNPSFormat.SNETS + if re.search(r"\bUniqueFileSources\b", header): + return GNPSFormat.SNETSV2 + if re.search(r"\b{}\b".format(re.escape("row ID")), header): + return self._CSV_GNPSFormats + return GNPSFormat.Unknown + def _validate(self) -> None: """Validate the file mappings file. @@ -90,7 +123,7 @@ def _validate(self) -> None: required_file_formats = { GNPSFormat.SNETS: "tsv", GNPSFormat.SNETSV2: "tsv", - GNPSFormat.FBMN: "csv", + self._CSV_GNPSFormats: "csv", } if not is_file_format(self._file, required_file_formats[self._gnps_format]): raise ValueError( @@ -102,7 +135,7 @@ def _validate(self) -> None: required_columns = { GNPSFormat.SNETS: ["cluster index", "AllFiles"], GNPSFormat.SNETSV2: ["cluster index", "UniqueFileSources"], - GNPSFormat.FBMN: ["row ID", " Peak area"], + self._CSV_GNPSFormats: ["row ID", " Peak area"], } with open(self._file, mode="rt") as f: header = f.readline() @@ -116,7 +149,7 @@ def _validate(self) -> None: # validate that cluster index or row id must be unique with open(self._file, mode="rt") as f: - if self._gnps_format is GNPSFormat.FBMN: + if self._gnps_format is self._CSV_GNPSFormats: reader = csv.DictReader(f, delimiter=",") ids = [row["row ID"] for row in reader] else: @@ -136,8 +169,8 @@ def _load(self) -> None: self._load_snets() elif self._gnps_format is GNPSFormat.SNETSV2: self._load_snetsv2() - elif self._gnps_format is GNPSFormat.FBMN: - self._load_fbmn() + elif self._gnps_format is self._CSV_GNPSFormats: + self._load_csv() def _load_snets(self) -> None: """Load file mapping from output of GNPS SNETS workflow. @@ -178,8 +211,8 @@ def _load_snetsv2(self) -> None: samples = row["UniqueFileSources"].split("|") self._mapping[spectrum_id] = samples - def _load_fbmn(self) -> None: - """Load file mapping from output of GNPS FBMN workflow. + def _load_csv(self) -> None: + """Load file mapping that is in .csv format. The column "row ID" is loaded as spectrum id. diff --git a/src/nplinker/metabolomics/gnps/gnps_format.py b/src/nplinker/metabolomics/gnps/gnps_format.py index 2492627b..271156fd 100644 --- a/src/nplinker/metabolomics/gnps/gnps_format.py +++ b/src/nplinker/metabolomics/gnps/gnps_format.py @@ -1,5 +1,4 @@ from __future__ import annotations -import re import tarfile import zipfile from enum import Enum @@ -112,7 +111,7 @@ def gnps_format_from_archive(file: str | PathLike) -> GNPSFormat: return GNPSFormat.Unknown -def _gnps_format_from_archive_gnps1(file: PathLike) -> GNPSFormat: +def _gnps_format_from_archive_gnps1(file: Path) -> GNPSFormat: """Detect GNPS format from GNPS1 archive file.""" # Guess the format from the filename of the zip file if GNPSFormat.FBMN.value in file.name: @@ -137,7 +136,7 @@ def _gnps_format_from_archive_gnps1(file: PathLike) -> GNPSFormat: return GNPSFormat.Unknown -def _gnps_format_from_archive_gnps2(file: PathLike) -> GNPSFormat: +def _gnps_format_from_archive_gnps2(file: Path) -> GNPSFormat: """Detect GNPS format from GNPS2 archive file.""" with tarfile.open(file, "r") as tar: try: @@ -155,34 +154,3 @@ def _gnps_format_from_archive_gnps2(file: PathLike) -> GNPSFormat: if workflow == GNPSFormat.GNPS2CN.value: return GNPSFormat.GNPS2CN return GNPSFormat.Unknown - - -def gnps_format_from_file_mapping(file: str | PathLike) -> GNPSFormat: - """Detect GNPS format from the given file mapping file. - - The GNPS file mapping file is located in different folders depending on the - GNPS workflow. Here are the locations in corresponding GNPS zip archives: - - - `METABOLOMICS-SNETS` workflow: the `.tsv` file in the folder - `clusterinfosummarygroup_attributes_withIDs_withcomponentID` - - `METABOLOMICS-SNETS-V2` workflow: the `.clustersummary` file (tsv) in the folder - `clusterinfosummarygroup_attributes_withIDs_withcomponentID` - - `FEATURE-BASED-MOLECULAR-NETWORKING` workflow: the `.csv` file in the folder - `quantification_table` - - Args: - file: Path to the file to peek the format for. - - Returns: - GNPS format identified in the file. - """ - with open(file, "r") as f: - header = f.readline().strip() - - if re.search(r"\bAllFiles\b", header): - return GNPSFormat.SNETS - if re.search(r"\bUniqueFileSources\b", header): - return GNPSFormat.SNETSV2 - if re.search(r"\b{}\b".format(re.escape("row ID")), header): - return GNPSFormat.FBMN - return GNPSFormat.Unknown diff --git a/tests/unit/metabolomics/conftest.py b/tests/unit/metabolomics/conftest.py index 2ea042ca..2bcfef09 100644 --- a/tests/unit/metabolomics/conftest.py +++ b/tests/unit/metabolomics/conftest.py @@ -6,6 +6,33 @@ from .. import GNPS_DATA_DIR +# +# Fixtures for both GNPS1 and GNPS2 +# + + +@pytest.fixture(scope="session") +def tmp_gnps_dir(tmp_path_factory): + """Temporary root directory for testing gnps.""" + return tmp_path_factory.mktemp("gnps") + + +@pytest.fixture(scope="session", autouse=True) +def prepare_data(tmp_gnps_dir, gnps_zip_files, gnps2_tar_files): + """Extract GNPS zip archives to the "tmp_gnps_dir" directory. + + The extracted archive is named after the workflow, for example the SNETS archive is extracted to + the "SNETS" directory in the "tmp_gnps_dir" directory. + + Note that the `autouse` must be set to `True` so that the fixture is executed before any other + test function. + """ + for workflow, zip_file in gnps_zip_files.items(): + extract_archive(zip_file, tmp_gnps_dir / workflow.name) + for workflow, tar_file in gnps2_tar_files.items(): + extract_archive(tar_file, tmp_gnps_dir / workflow.name) + + # # Fixtures for GNPS1 # @@ -49,26 +76,6 @@ def gnps_zip_files() -> dict[GNPSFormat, PathLike]: } -@pytest.fixture(scope="session") -def tmp_gnps_dir(tmp_path_factory): - """Temporary root directory for testing gnps.""" - return tmp_path_factory.mktemp("gnps") - - -@pytest.fixture(scope="session", autouse=True) -def prepare_data(tmp_gnps_dir, gnps_zip_files): - """Extract GNPS zip archives to the "tmp_gnps_dir" directory. - - The extracted archive is named after the workflow, e.g. "SNETS", "SNETSV2", "FBMN", so for - example the SNETS archive is extracted to the "SNETS" directory in the "tmp_gnps_dir" directory. - - Note that the `autouse` must be set to `True` so that the fixture is executed before any other - test function. - """ - for workflow, zip_file in gnps_zip_files.items(): - extract_archive(zip_file, tmp_gnps_dir / workflow.name) - - @pytest.fixture(scope="session") def gnps_file_mappings_files(tmp_gnps_dir) -> dict[GNPSFormat, PathLike]: """Get the paths of the GNPS file mappings.""" @@ -175,3 +182,71 @@ def gnps2_tar_files() -> dict[GNPSFormat, PathLike]: GNPSFormat.GNPS2FBMN: GNPS_DATA_DIR / "2014f321d72542afb5216c932e0d5079.tar", GNPSFormat.Unknown: GNPS_DATA_DIR / "gnps2_nnknown.tar", } + + +@pytest.fixture(scope="session") +def gnps2_file_mappings_files(tmp_gnps_dir) -> dict[GNPSFormat, PathLike]: + """Get the paths of the GNPS2 file mappings.""" + return { + GNPSFormat.GNPS2CN: tmp_gnps_dir + / GNPSFormat.GNPS2CN.name + / "nf_output" + / "clustering" + / "featuretable_reformatted_presence.csv", + GNPSFormat.GNPS2FBMN: tmp_gnps_dir + / GNPSFormat.GNPS2FBMN.name + / "nf_output" + / "clustering" + / "featuretable_reformated.csv", + } + + +@pytest.fixture(scope="session") +def gnps2_spectra_files(tmp_gnps_dir) -> dict[GNPSFormat, PathLike]: + """Get the paths of the GNPS2 spectra.""" + return { + GNPSFormat.GNPS2CN: tmp_gnps_dir + / GNPSFormat.GNPS2CN.name + / "nf_output" + / "clustering" + / "specs_ms.mgf", + GNPSFormat.GNPS2FBMN: tmp_gnps_dir + / GNPSFormat.GNPS2FBMN.name + / "nf_output" + / "clustering" + / "specs_ms.mgf", + } + + +@pytest.fixture(scope="session") +def gnps2_mf_files(tmp_gnps_dir) -> dict[GNPSFormat, PathLike]: + """Get the paths of the GNPS2 molecular formula files.""" + return { + GNPSFormat.GNPS2CN: tmp_gnps_dir + / GNPSFormat.GNPS2CN.name + / "nf_output" + / "networking" + / "filtered_pairs.tsv", + GNPSFormat.GNPS2FBMN: tmp_gnps_dir + / GNPSFormat.GNPS2FBMN.name + / "nf_output" + / "networking" + / "filtered_pairs.tsv", + } + + +@pytest.fixture(scope="session") +def gnps2_annotations_files(tmp_gnps_dir) -> dict[GNPSFormat, PathLike]: + """Get the paths of the GNPS2 annotations file.""" + return { + GNPSFormat.GNPS2CN: tmp_gnps_dir + / GNPSFormat.GNPS2CN.name + / "nf_output" + / "library" + / "merged_results_with_gnps.tsv", + GNPSFormat.GNPS2FBMN: tmp_gnps_dir + / GNPSFormat.GNPS2FBMN.name + / "nf_output" + / "library" + / "merged_results_with_gnps.tsv", + } diff --git a/tests/unit/metabolomics/test_gnps_file_mapping_loader.py b/tests/unit/metabolomics/test_gnps_file_mapping_loader.py index 00b8e1c9..6b21b9dd 100644 --- a/tests/unit/metabolomics/test_gnps_file_mapping_loader.py +++ b/tests/unit/metabolomics/test_gnps_file_mapping_loader.py @@ -11,7 +11,7 @@ [GNPSFormat.SNETSV2, 7383, "140221_ME_14_13.mzML"], ], ) -def test_file_mapping_loader(workflow, num_spectra, filename, gnps_file_mappings_files): +def test_file_mapping_loader_gnps1(workflow, num_spectra, filename, gnps_file_mappings_files): loader = GNPSFileMappingLoader(gnps_file_mappings_files[workflow]) assert len(loader.mappings) == num_spectra # test file is in the mapping for spectrum "1" @@ -22,6 +22,20 @@ def test_file_mapping_loader(workflow, num_spectra, filename, gnps_file_mappings assert "5425_5426_mod.mzXML" not in loader.mappings["1"] +@pytest.mark.parametrize( + "workflow, num_spectra, filename", + [ + [GNPSFormat.GNPS2CN, 1051, "blk_g10_dora.mzML"], + [GNPSFormat.GNPS2FBMN, 371, "blk_g10_dora.mzML"], + ], +) +def test_file_mapping_loader_gnps2(workflow, num_spectra, filename, gnps2_file_mappings_files): + loader = GNPSFileMappingLoader(gnps2_file_mappings_files[workflow]) + assert len(loader.mappings) == num_spectra + # test file is in the mapping for spectrum "2" + assert filename in loader.mappings["2"] + + def test_mapping_reversed(gnps_file_mappings_files): loader = GNPSFileMappingLoader(gnps_file_mappings_files[GNPSFormat.SNETSV2]) assert len(loader.mapping_reversed) == 6 diff --git a/tests/unit/metabolomics/test_gnps_format.py b/tests/unit/metabolomics/test_gnps_format.py index fc267f91..11532c5d 100644 --- a/tests/unit/metabolomics/test_gnps_format.py +++ b/tests/unit/metabolomics/test_gnps_format.py @@ -1,7 +1,6 @@ import pytest from nplinker.metabolomics.gnps import GNPSFormat from nplinker.metabolomics.gnps import gnps_format_from_archive -from nplinker.metabolomics.gnps import gnps_format_from_file_mapping from nplinker.metabolomics.gnps import gnps_format_from_gnps1_task_id @@ -32,12 +31,6 @@ def test_gnps_format_from_archive_gnps1(workflow: str, gnps_zip_files): assert actual is workflow -@pytest.mark.parametrize("workflow", [GNPSFormat.FBMN, GNPSFormat.SNETS, GNPSFormat.SNETSV2]) -def test_gnps_format_from_file_mapping(workflow: str, gnps_file_mappings_files): - actual = gnps_format_from_file_mapping(gnps_file_mappings_files[workflow]) - assert actual is workflow - - # # Test GNPS2 formats #