From 037a1624b420ccf54eef2e5134c403e47609369a Mon Sep 17 00:00:00 2001 From: Cunliang Geng Date: Thu, 14 Dec 2023 12:00:54 +0100 Subject: [PATCH] remove deprecated functions of loading metabolomics data --- src/nplinker/annotations.py | 225 ---------- src/nplinker/loader.py | 26 +- src/nplinker/metabolomics/load_gnps.py | 502 ---------------------- src/nplinker/metabolomics/metabolomics.py | 218 ---------- tests/conftest.py | 10 - tests/metabolomics/test_load_gnps.py | 69 --- tests/metabolomics/test_metabolomics.py | 25 -- 7 files changed, 1 insertion(+), 1074 deletions(-) delete mode 100644 src/nplinker/annotations.py delete mode 100644 src/nplinker/metabolomics/load_gnps.py delete mode 100644 src/nplinker/metabolomics/metabolomics.py delete mode 100644 tests/metabolomics/test_load_gnps.py delete mode 100644 tests/metabolomics/test_metabolomics.py diff --git a/src/nplinker/annotations.py b/src/nplinker/annotations.py deleted file mode 100644 index 30c47871..00000000 --- a/src/nplinker/annotations.py +++ /dev/null @@ -1,225 +0,0 @@ -# Copyright 2021 The NPLinker Authors -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import csv -import os -from deprecated import deprecated -from nplinker.metabolomics import GNPS_KEY -from nplinker.metabolomics import Spectrum -from .logconfig import LogConfig - - -logger = LogConfig.getLogger(__name__) - -GNPS_URL_FORMAT = ( - "https://metabolomics-usi.gnps2.org/{}/?usi1=mzspec:GNPS:GNPS-LIBRARY:accession:{}" -) -GNPS_INDEX_COLUMN = "#Scan#" -GNPS_DATA_COLUMNS = ["Compound_Name", "Organism", "MQScore", "SpectrumID"] - - -def _headers_match_gnps(headers: list[str]) -> bool: - for k in GNPS_DATA_COLUMNS: - if k not in headers: - return False - return True - - -@deprecated(version="1.3.3", reason="Use GNPSAnnotationLoader class instead.") -def create_gnps_annotation(spec: Spectrum, gnps_anno: dict): - """Function to insert png, json, svg and spectrum information in GNPS annotation data for given spectrum. - - Args: - spec(Spectrum): Spectrum to which to add the annotation. - gnps_anno(dict): Annotation metadata to add and augment. - - Raises: - Exception: Raises exception if the spectrum already contains a GNPS annotation. - """ - # also insert useful URLs - for t in ["png", "json", "svg", "spectrum"]: - gnps_anno[f"{t}_url"] = GNPS_URL_FORMAT.format(t, gnps_anno["SpectrumID"]) - - if GNPS_KEY in spec.annotations: - # TODO is this actually an error or can it happen normally? - raise Exception("Multiple GNPS annotations for Spectrum {}!".format(spec.spectrum_id)) - - spec.set_annotations(GNPS_KEY, [gnps_anno]) - # shortcut, useful for rosetta code - spec.gnps_id = gnps_anno["SpectrumID"] - - -@deprecated(version="1.3.3", reason="Use GNPSAnnotationLoader class instead.") -def load_annotations( - root: str | os.PathLike, - config: str | os.PathLike, - spectra: list[Spectrum], - spec_dict: dict[str, Spectrum], -) -> list[Spectrum]: - """Load the annotations from the GNPS annotation file present in root to the spectra. - - Args: - root(str | os.PathLike): Path to the downloaded and extracted GNPS results. - config(str | os.PathLike): Path to config file for custom file locations. - spectra(list[Spectrum]): List of spectra to annotate. - spec_dict(dict[str, Spectrum]): Dictionary mapping to spectra passed in `spectra` variable. - - Raises: - Exception: Raises exception if custom annotation config file has invalid content. - - Returns: - list[Spectrum]: List of annotated spectra. - """ - if not os.path.exists(root): - logger.debug(f"Annotation directory not found ({root})") - return spectra - - ac = _read_config(config) - - logger.debug(f"Parsed {len(ac)} annotations configuration entries") - - annotation_files = _find_annotation_files(root, config) - - logger.debug("Found {} annotations .tsv files in {}".format(len(annotation_files), root)) - - for af in annotation_files: - with open(af) as f: - rdr = csv.reader(f, delimiter="\t") - headers = next(rdr) - filename = os.path.split(af)[1] - - if _headers_match_gnps(headers): - # assume this is our main annotations file - logger.debug(f"Parsing GNPS annotations from {af}") - - scans_index = headers.index(GNPS_INDEX_COLUMN) - - # each line should be a different spec ID here - for line in rdr: - # read the scan ID column and get the corresponding Spectrum object - scan_id = line[scans_index] - if scan_id not in spec_dict: - logger.warning( - "Unknown spectrum ID found in GNPS annotation file (ID={})".format( - scan_id - ) - ) - continue - - spec = spec_dict[scan_id] - data_cols = set(GNPS_DATA_COLUMNS) - # merge in any extra columns the user has provided - if filename in ac: - data_cols.update(ac[filename][1]) - - data = {} - for dc in data_cols: - if dc not in headers: - logger.warning(f'Column lookup failed for "{dc}"') - continue - data[dc] = line[headers.index(dc)] - - create_gnps_annotation(spec, data) - else: - logger.debug(f"Parsing general annotations from {af}") - # this is a general annotations file, so rely purely on the user-provided columns - if filename not in ac: - logger.warning( - 'Failed to parse annotations from "{}", no config info supplied in {}'.format( - filename, config - ) - ) - continue - - index_col, data_cols = ac[filename] - if index_col not in headers: - raise Exception( - 'Annotation index column "{}" not found in file "{}"!'.format( - index_col, filename - ) - ) - - spec_id_index = headers.index(index_col) - - # note that might have multiple lines for the same spec ID! - spec_annotations = {} - for line in rdr: - scan_id = line[spec_id_index] - if scan_id not in spec_dict: - logger.warning( - 'Unknown spectrum ID found in annotation file "{}", ID is "{}"'.format( - filename, scan_id - ) - ) - continue - - spec = spec_dict[scan_id] - if spec not in spec_annotations: - spec_annotations[spec] = [] - - data = {} - for dc in data_cols: - if dc not in headers: - logger.warning(f'Column lookup failed for "{dc}"') - continue - data[dc] = line[headers.index(dc)] - spec_annotations[spec].append(data) - - logger.debug("Adding annotation data to {} spectra".format(len(spec_annotations))) - for spec, anno in spec_annotations.items(): - spec.set_annotations(filename, anno) - - return spectra - - -def _find_annotation_files(root: str, config: str) -> list[str]: - """Detect all annotation files in the root folder or specified in the config file. - - Args: - root(str): Root folder in which to look for annotation files in .tsv format. - config(str): Config file to load custom annotation files. - - Returns: - list[str]: List of detected annotation files. - """ - annotation_files = [] - for f in os.listdir(root): - if f == os.path.split(config)[1] or not f.endswith(".tsv"): - continue - annotation_files.append(os.path.join(root, f)) - return annotation_files - - -def _read_config(config): - ac = {} - if os.path.exists(config): - # parse annotations config file if it exists - with open(config) as f: - rdr = csv.reader(f, delimiter="\t") - for row in rdr: - # expecting 3 columns: filename, index col name, data col name(s) - if len(row) != 3: - logger.warning("Malformed line in annotations configuration: {}".format(row)) - continue - # record the columns with filename as key - data_cols = row[2].split(",") - if len(data_cols) == 0: - logger.warning( - "No data columns selected in annotation file {}, skipping it!".format( - row[0] - ) - ) - continue - ac[row[0]] = (row[1], data_cols) - return ac diff --git a/src/nplinker/loader.py b/src/nplinker/loader.py index abaa5e5f..7c0ab98c 100644 --- a/src/nplinker/loader.py +++ b/src/nplinker/loader.py @@ -1,7 +1,6 @@ import glob import os from pathlib import Path -from nplinker.annotations import load_annotations from nplinker.class_info.chem_classes import ChemClassPredictions from nplinker.class_info.class_matches import ClassMatches from nplinker.class_info.runcanopus import run_canopus @@ -18,7 +17,6 @@ from nplinker.globals import PFAM_PATH from nplinker.globals import STRAIN_MAPPINGS_FILENAME from nplinker.logconfig import LogConfig -from nplinker.metabolomics.metabolomics import load_dataset from nplinker.pairedomics.downloader import PODPDownloader from nplinker.pairedomics.runbigscape import run_bigscape from nplinker.pairedomics.strain_mappings_generator import podp_generate_strain_mappings @@ -401,30 +399,8 @@ def _load_strain_mappings(self): return True - # TODO CG: replace deprecated load_dataset with GPNSLoader + # TODO CG: rewrite the loading process using GPNSLoader def _load_metabolomics(self): - spec_dict, self.spectra, self.molfams, unknown_strains = load_dataset( - self.strains, - self.mgf_file, - self.edges_file, - self.nodes_file, - self.quantification_table_file, - self.metadata_table_file, - self._extended_metadata_table_parsing, - ) - - us_path = os.path.join(self._root, "unknown_strains_met.csv") - logger.warning("Writing unknown strains from METABOLOMICS data to {}".format(us_path)) - with open(us_path, "w") as us: - us.write("# unknown strain label\n") - for strain in unknown_strains.keys(): - us.write(f"{strain}\n") - - # load any available annotations from GNPS or user-provided files - logger.info("Loading provided annotation files ({})".format(self.annotations_dir)) - self.spectra = load_annotations( - self.annotations_dir, self.annotations_config_file, self.spectra, spec_dict - ) return True def _load_genomics(self): diff --git a/src/nplinker/metabolomics/load_gnps.py b/src/nplinker/metabolomics/load_gnps.py deleted file mode 100644 index 6caf3f91..00000000 --- a/src/nplinker/metabolomics/load_gnps.py +++ /dev/null @@ -1,502 +0,0 @@ -import csv -import os -import re -from typing import Any -from deprecated import deprecated -from nplinker.logconfig import LogConfig -from nplinker.metabolomics import Spectrum -from nplinker.metabolomics.gnps import GNPSFormat -from nplinker.metabolomics.gnps import gnps_format_from_file_mapping -from nplinker.strain import Strain -from nplinker.strain_collection import StrainCollection - - -logger = LogConfig.getLogger(__name__) - -# compile a regex for matching .mzXML and .mzML strings -RE_MZML_MZXML = re.compile(".mzXML|.mzML") - -# -# methods for parsing metabolomics data files -# - - -@deprecated(version="1.3.3", reason="Use the GNPSFileMappingLoader class instead.") -def _messy_strain_naming_lookup(mzxml: str, strains: StrainCollection) -> Strain | None: - """Wrapper around StrainCollection::lookup which is more permissive and handles often occuring cases of non perfect aliasing. - - Args: - mzxml(str): Messy alias to use as a base for augmented lookup. - strains(StrainCollection): StrainCollection in which to search for the strain. - - Returns: - Strain or None: The strain identified to be matching or None. - """ - if strains.has_name(mzxml): - # life is simple! - return strains.lookup(mzxml) - - # 1. knock off the .mzXML and try again (using splitext should also handle - # other extensions like .mzML here) - mzxml_no_ext = os.path.splitext(mzxml)[0] - if strains.has_name(mzxml_no_ext): - return strains.lookup(mzxml_no_ext) - - # 2. if that doesn't work, try using everything up to the first -/_ - underscore_index = mzxml_no_ext.find("_") - hyphen_index = mzxml_no_ext.find("-") - mzxml_trunc_underscore = ( - mzxml_no_ext if underscore_index == -1 else mzxml_no_ext[:underscore_index] - ) - mzxml_trunc_hyphen = mzxml_no_ext if hyphen_index == -1 else mzxml_no_ext[:hyphen_index] - - if underscore_index != -1 and strains.has_name(mzxml_trunc_underscore): - return strains.lookup(mzxml_trunc_underscore) - if hyphen_index != -1 and strains.has_name(mzxml_trunc_hyphen): - return strains.lookup(mzxml_trunc_hyphen) - - # 3. in the case of original Crusemann dataset, many of the filenames seem to - # match up to real strains with the initial "CN" missing ??? - for mzxml_trunc in [mzxml_trunc_hyphen, mzxml_trunc_underscore]: - if strains.has_name("CN" + mzxml_trunc): - return strains.lookup("CN" + mzxml_trunc) - - # give up - return None - - -@deprecated(version="1.3.3", reason="Use the GNPSFileMappingLoader class instead.") -def _md_convert(val: Any) -> int | float | None | Any: - """Try to convert raw metadata values from text to integer, then float if that fails. - - Args: - val(Any): Value to parse. - - Returns: - int|float|None: Value as int or float or None if it is a string containint 'n/a', Any if not possible to parse. - """ - try: - return int(val) - except (ValueError, TypeError): - try: - return float(val) - except (ValueError, TypeError): - if val.lower() == "n/a": - return None - return val - - -@deprecated(version="1.3.3", reason="Use the GNPSFileMappingLoader class instead.") -def _parse_mzxml_header( - hdr: str, - strains: StrainCollection, - md_table: dict[str, dict[str, str]], - ext_metadata_parsing: bool, -) -> tuple[str | None, str | None, bool]: - """Return the file identifier component from the column name. - - Args: - hdr(str): Column name to search for the file identifier component. - strains(StrainCollection): StrainCollection in which to look for the strain name (= file identifier component) - md_table(dict[str, dict[str, str]]): Metadata table dictionary from which to parse `strain` and `growthmedium` keys. - ext_metadata_parsing(bool): Whether to use extended metadata parsing or not. - - Returns: - tuple[str|None, str|None, bool]: Detected growthmedium or None, strain name or None and whether the strain is contained in the strain collection. - - Examples: - given a column header from the quantification_table file produced by GNPS, - this method checks if it's one that contains peak information for a given - strain label by searching for the 'Peak area' string. - e.g. 'KRD168_ISP3.mzML Peak area' - it then extracts the strain label by taking the text up to the '.mzML' part - and attempts to match this to the set of strains provided by the user in - the strain_mappings file. finally it also tries to extract the growth - medium label as given in the metadata_table file, again using the strain - label which should match between the two files. - >>> - """ - # assume everything up to '.mz' is the identifier/label of this strain - strain_name = hdr[: hdr.index(".mz")] - growth_medium = None - - # check if the strain label exists in the set of strain mappings the user - # has provided - if not strains.has_name(strain_name): - # if this check fails, it could mean a missing strain ID mapping. this isn't - # fatal but will produce a warning and an entry added to the file - # unknown_strains_met.csv in the dataset folder. - # - # however there could be some identifiers which are not strains and - # these should just be ignored. - # - # if we have a metadata table file, and the parsed strain name does NOT - # appear in it, this indicates it's not a strain - if md_table is not None and strain_name not in md_table: - # we can ignore this completely - return (None, None, False) - - # if the strain is in the table, then it means the user probably hasn't given us - # a valid mapping for it, so a warning will be displayed and the name added - # to the unknown_strains.csv file later on. however if the config file option - # extended_metadata_table_parsing has been enabled (the ext_metadata_parsing) - # parameter in this method), it means we should take the content of the - # ATTRIBUTE_Strain column from the table and use that as the strain, adding - # an extra alias for it too. Additionally the ATTRIBUTE_Medium column content - # should be recorded as the growth medium for the strain - if md_table is not None and strain_name in md_table and ext_metadata_parsing: - growth_medium = md_table[strain_name]["growthmedium"] - strain_col_content = md_table[strain_name]["strain"] - - if strains.has_name(strain_col_content): - strain = strains.lookup(strain_col_content) - strain.add_alias(strain_name) - # this merges the set of aliases back into the internal - # lookup table in the StrainCollection - strains.add(strain) - else: - # if this still fails it's an unknown strain - logger.warning( - "Unknown strain identifier: {} (parsed from {})".format(strain_name, hdr) - ) - return (strain_name, growth_medium, True) - else: - # emit a warning message to indicate that the user needs to add this to - # their strain_mappings file - logger.warning( - "Unknown strain identifier: {} (parsed from {})".format(strain_name, hdr) - ) - - return (strain_name, growth_medium, not strains.has_name(strain_name)) - - -@deprecated(version="1.3.3", reason="Use the GNPSFileMappingLoader class instead.") -def _load_clusterinfo_old( - gnps_format: str, strains: StrainCollection, file: str, spec_dict: dict[str, Spectrum] -) -> dict[str, int]: - """Load info about clusters from old style GNPS files. - - Args: - gnps_format(str): Identifier for the GNPS format of the file. Has to be one of [GNPS_FORMAT_OLD_ALLFILES, GNPS_FORMAT_OLD_UNIQUEFILES] - strains(StrainCollection): StrainCollection in which to search for the detected strains. - file(str): Path to file from which to load the cluster information. - spec_dict(dict[str, Spectrum]): Dictionary with already loaded spectra into which the metadata read from the file will be inserted. - - Raises: - Exception: Raises exception if not supported GNPS format was detected. - - Returns: - dict[str, int]: Returns dictionary with unknown strain names as keys and counts of occurrence as values. - """ - # each line of this file represents a metabolite. - # columns representing strain IDs are *ignored* here in favour of parsing - # .mz(X)ML filenames from either the AllFiles or UniqueFileSources column. - # both of these list the .mz(X)ML files the molecule was found in (plus the scan - # number in the file in the AllFiles case) - unknown_strains = {} - with open(file) as f: - reader = csv.reader(f, delimiter="\t") - headers = next(reader) - clu_index_index = headers.index("cluster index") - if gnps_format == GNPSFormat.SNETS: - mzxml_index = headers.index("AllFiles") - elif gnps_format == GNPSFormat.SNETSV2: - mzxml_index = headers.index("UniqueFileSources") - else: - raise Exception(f"Unexpected GNPS format {gnps_format}") - - for line in reader: - # get the values of the important columns - clu_index = line[clu_index_index] - if gnps_format == GNPSFormat.SNETSV2: - mzxmls = line[mzxml_index].split("|") - else: - mzxmls = line[mzxml_index].split("###") - - metadata = {"cluster_index": clu_index, "files": {}} - seen_files = set() - - for data in mzxmls: - # TODO ok to ignore scan number if available? - mzxml = data if gnps_format == GNPSFormat.SNETSV2 else data.split(":")[0] - - # TODO is this OK/sensible? - if mzxml in seen_files: - continue - seen_files.add(mzxml) - - # add to the list of files for this molecule - metadata["files"][mzxml] = mzxml - - # should have a strain alias for the mxXML file to lookup here (in theory at least) - strain = _messy_strain_naming_lookup(mzxml, strains) - if strain is None: - # TODO: how to handle this? can't just give up, simply warn? - if mzxml not in unknown_strains: - logger.warning( - "Unknown strain: {} for cluster index {}".format(mzxml, clu_index) - ) - unknown_strains[mzxml] = 1 - else: - unknown_strains[mzxml] += 1 - # else: - # print('{} ===> {}'.format(mzxml, strain)) - if clu_index in spec_dict: - if strain is not None: - # TODO this need fixed somehow (missing growth medium info) - spec_dict[clu_index].add_strain(strain, None, 1) - - # update metadata on Spectrum object - spec_dict[clu_index].metadata.update(metadata) - - if len(unknown_strains) > 0: - logger.warning( - "{} unknown strains were detected a total of {} times".format( - len(unknown_strains), sum(unknown_strains.values()) - ) - ) - - return unknown_strains - - -@deprecated(version="1.3.3", reason="Use the GNPSFileMappingLoader class instead.") -def _parse_metadata_table(file: str) -> dict[str, dict[str, str | None]]: - """Parses the metadata table file from GNPS. - - Args: - file(str): Path to metadata table. - - Returns: - dict[str, dict[str, str]]: Parsed metadata, mapping from filenames to the metadata dictionary. - - """ - table = {} - with open(file) as f: - reader = csv.reader(f, delimiter="\t") - headers = next(reader) - - # check the format is as expected - if len(headers) < 3 or headers[0] != "filename": - # expecting at least 3 columns with the first being 'filename' - logger.error('Unrecognised metadata table format in file "{}"'.format(file)) - raise Exception("Expecting at least 3 columns with the first being 'filename'") - - # find the column numbers we're interested in (can't rely on both of these being present) - # ATTRIBUTE_SampleType, _Strain, _Medium, _Organism - sampletype_col, medium_col, strain_col = -1, -1, -1 - col_names = { - "sampletype": "ATTRIBUTE_SampleType", - "growthmedium": "ATTRIBUTE_Medium", - "strain": "ATTRIBUTE_Strain", - } - - # also: ATTRIBUTE_Strain might be useful, there's also ATTRIBUTE_Organism - for i, hdr in enumerate(headers): - if hdr == col_names["sampletype"]: - sampletype_col = i - if hdr == col_names["growthmedium"]: - medium_col = i - if hdr == col_names["strain"]: - strain_col = i - - for col, name in [ - (sampletype_col, col_names["sampletype"]), - (medium_col, col_names["growthmedium"]), - (strain_col, col_names["strain"]), - ]: - if col == -1: - logger.warning('No {} column in file "{}"'.format(name, file)) - - for line in reader: - # only non-BLANK entries - if sampletype_col != -1 and line[sampletype_col] != "BLANK": - # want to strip out the .mz(X)ML extension here to use as the key, - # then record the available columns (assume SampleType is always - # available) - data = {"sampletype": line[sampletype_col], "growthmedium": None, "strain": None} - if medium_col != -1: - data["growthmedium"] = line[medium_col] - if strain_col != -1: - data["strain"] = line[strain_col] - table[RE_MZML_MZXML.sub("", line[0])] = data - - return table - - -@deprecated(version="1.3.3", reason="Use the GNPSFileMappingLoader class instead.") -def _load_clusterinfo_fbmn( - strains: StrainCollection, - nodes_file: str, - extra_nodes_file: str, - md_table_file: str, - spec_dict: dict[str, Spectrum], - ext_metadata_parsing: bool, -) -> tuple[dict[str, dict[str, str | None]], dict[str, str]]: - """Load the clusterinfo from a feature based molecular networking run output from GNPS. - - Args: - strains(StrainCollection): StrainCollection in which to look for the file identifiers / strain names. - nodes_file(str): File from which to load the cluster information. - extra_nodes_file(str): Unknown. - md_table_file(str): Path to metadata table. Deprecated. - spec_dict(dict[str, Spectrum]): Dictionary with already loaded spectra. - ext_metadata_parsing(bool): Whether to use extended metadata parsing. - - Returns: - tuple[dict[str, dict], dict[str, str]]: Spectra info mapping from spectrum id to all columns in the nodes file and unknown strain mapping from file identifier to spectrum id. - """ - spec_info = {} - - # parse metadata table if available - md_table = _parse_metadata_table(md_table_file) - - # combine the information in the nodes_file (clusterinfo_summary folder) and - # the extra_nodes_file (quantification_table folder), indexed by the "cluster index" - # and "row ID" fields respectively to link the rows - with open(nodes_file) as f: - reader = csv.reader(f, delimiter="\t") - headers = next(reader) - ci_index = headers.index("cluster index") - - # take each line and generate a dict storing each header:column value - # and insert that dict into the spec_info dict - for line in reader: - tmp = {} - for i, v in enumerate(line): - tmp[headers[i]] = v - spec_info[line[ci_index]] = tmp - - with open(extra_nodes_file) as f: - reader = csv.reader(f, delimiter=",") - headers = next(reader) - ri_index = headers.index("row ID") - - # do almost the same thing again but a) match the "cluster index" from the - # nodes_file to the "row ID" from this file, and update the per-row dict - # with the extra columns from this file - for line in reader: - ri = line[ri_index] - tmp = {} - for i, v in enumerate(line): - tmp[headers[i]] = v - spec_info[ri].update(tmp) - - logger.info("Merged nodes data (new-style), total lines = {}".format(len(spec_info))) - - unknown_strains = {} - - # TODO: at the moment this iterates over each spectrum in the outer loop, and - # over the full set of columns in the inner loop. probably makes more sense to - # swap that around, or at least to avoid calling parse_mzxml_header repeatedly - # for the same column headers over and over! - - # for each spectrum - for spec_id, spec_data in spec_info.items(): - # look up the Spectrum object with this ID (spec_dict is sourced from the MGF file) - spectrum = spec_dict[spec_id] - - # TODO this should probably be handled by parse_metadata_table - if "ATTRIBUTE_SampleType" in spec_data: - st = spec_data["ATTRIBUTE_SampleType"].split(",") - spectrum.metadata["ATTRIBUTE_SampleType"] = st - - # TODO better way of filtering/converting all this stuff down to what's relevant? - # could search for each strain ID in column title but would be slower? - # - # iterate over the column values for the spectrum - for k, v in spec_data.items(): - # if the value is a "0", ignore immediately as we only care about nonzero peak intensities - if v == "0": - continue - - # ignore any non-"Peak area" columns, want ".mz(X)ML Peak area" - if k.find(" Peak area") == -1: - # record the value as an entry in the metadata dict in case it's useful - # for some other purpose - spectrum.metadata[k] = v - continue - - # TODO this will probably need updating for platform data (should already - # have a set of strain names...) - # - # given a ".mz(X)ML Peak area" column value, attempt to lookup the - # matching Strain object from the set of mappings we have (optionally doing - # some further stuff involving the metadata table) - (strain_name, growth_medium, is_unknown) = _parse_mzxml_header( - k, strains, md_table, ext_metadata_parsing - ) - - # if the strain name couldn't be recognised at all, ignore it - if strain_name is None: - continue - - # if the name seems valid (appears in the metadata table), record it as unknown - if is_unknown: - unknown_strains[k[: k.index(".mz")]] = spec_id - continue - - # create a new strain object if the intensity value is a float > 0 - v = _md_convert(v) - if strains.has_name(strain_name) and isinstance(v, float) and float(v) > 0: - # find the strain object, and add the growth medium + intensity to it. the - # growth medium will only be set if extended_metadata_table_parsing is - # enabled in the config file and the metadata table file contains that info - v_float = float(v) - strain = strains.lookup(strain_name) - spectrum.add_strain(strain, growth_medium, v_float) - - # record this as an entry in the metadata dict as well - spectrum.metadata[k] = v - - return spec_info, unknown_strains - - -@deprecated(version="1.3.3", reason="Use the GNPSFileMappingLoader class instead.") -def load_gnps( - strains: StrainCollection, - nodes_file: str, - quant_table_file: str, - metadata_table_file: str, - ext_metadata_parsing: bool, - spec_dict: dict[str, Spectrum], -) -> dict[str, int]: - """Wrapper function to load information from GNPS outputs. - - Args: - strains(StrainCollection): StrainCollection in which to add/look for the strains. - nodes_file(str): Path to the file holding the cluster information. - quant_table_file(str): Path to the quantification table. - metadata_table_file(str): Path to the metadata table. - ext_metadata_parsing(bool): Whether to use extended metadata parsing. - spec_dict(dict[str, Spectrum]): Mapping from int to spectra loaded from file. - - Raises: - Exception: Raises exception if an unknown GNPS format is encountered. - - Returns: - dict[str, int]: Returns a mapping from unknown strains which are found to spectra ids which occur in these unknown strains. - """ - gnps_format = gnps_format_from_file_mapping(nodes_file) - logger.debug( - "Nodes_file: {}, quant_table_exists?: {}".format(nodes_file, quant_table_file is None) - ) - if gnps_format == GNPSFormat.Unknown: - raise Exception("Unknown/unsupported GNPS data format") - - # now things depend on the dataset format - # if we don't have a quantification table, must be older-style dataset (or unknown format) - if gnps_format != GNPSFormat.FBMN and quant_table_file is None: - logger.info("Found older-style GNPS dataset, no quantification table") - unknown_strains = _load_clusterinfo_old(gnps_format, strains, nodes_file, spec_dict) - else: - logger.info("quantification table exists, new-style GNPS dataset") - _, unknown_strains = _load_clusterinfo_fbmn( - strains, - nodes_file, - quant_table_file, - metadata_table_file, - spec_dict, - ext_metadata_parsing, - ) - - return unknown_strains diff --git a/src/nplinker/metabolomics/metabolomics.py b/src/nplinker/metabolomics/metabolomics.py deleted file mode 100644 index 9d631e2a..00000000 --- a/src/nplinker/metabolomics/metabolomics.py +++ /dev/null @@ -1,218 +0,0 @@ -# Copyright 2021 The NPLinker Authors -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import csv -from os import PathLike -from deprecated import deprecated -from nplinker.annotations import create_gnps_annotation -from nplinker.logconfig import LogConfig -from nplinker.parsers.mgf import LoadMGF -from .load_gnps import load_gnps -from .molecular_family import MolecularFamily -from .singleton_family import SingletonFamily -from .spectrum import Spectrum - - -logger = LogConfig.getLogger(__name__) - - -def _mols_to_spectra(ms2, metadata): - ms2_dict = {} - for m in ms2: - if m[3] not in ms2_dict: - ms2_dict[m[3]] = [] - ms2_dict[m[3]].append((m[0], m[2])) - - spectra = [] - for i, m in enumerate(ms2_dict.keys()): - new_spectrum = Spectrum( - i, - ms2_dict[m], - m.name, - metadata[m.name]["precursormass"], - metadata[m.name]["parentmass"], - ) - new_spectrum.metadata = metadata[m.name] - # add GNPS ID if in metadata under SPECTRUMID (this doesn't seem to be in regular MGF files - # from GNPS, but *is* in the rosetta mibig MGF) - # note: LoadMGF seems to lowercase (some) metadata keys? - if "spectrumid" in new_spectrum.metadata: - # add an annotation for consistency, if not already there - if "gnps" not in new_spectrum.annotations: - gnps_anno = { - k: None for k in ["Compound_Name", "Organism", "MQScore", "SpectrumID"] - } - gnps_anno["SpectrumID"] = new_spectrum.metadata["spectrumid"] - create_gnps_annotation(new_spectrum, gnps_anno) - spectra.append(new_spectrum) - - return spectra - - -@deprecated(version="1.3.3", reason="Use the GNPSMolecularFamilyLoader class instead.") -def load_edges(edges_file: str | PathLike, spec_dict: dict[str, Spectrum]): - """Insert information about the molecular family into the spectra. - - Args: - edges_file(str): File containing the molecular families. - spec_dict(dict[str, Spectrum]): Dictionary with mapping from spectra_id to Spectrum. - - Raises: - Exception: Raises exception if the edges file doesn't contain the correct columns. - """ - logger.debug("loading edges file: {} [{} spectra from MGF]".format(edges_file, len(spec_dict))) - with open(edges_file) as f: - reader = csv.reader(f, delimiter="\t") - headers = next(reader) - try: - cid1_index = headers.index("CLUSTERID1") - cid2_index = headers.index("CLUSTERID2") - cos_index = headers.index("Cosine") - fam_index = headers.index("ComponentIndex") - except ValueError: - raise Exception("Unknown or missing column(s) in edges file: {}".format(edges_file)) - - for line in reader: - spec1_id = line[cid1_index] - spec2_id = line[cid2_index] - cosine = float(line[cos_index]) - family = line[fam_index] - - if spec1_id in spec_dict and spec2_id in spec_dict: - spec1 = spec_dict[spec1_id] - spec2 = spec_dict[spec2_id] - - if family != "-1": # singletons - spec1.family_id = family - spec2.family_id = family - - spec1.edges.append((spec2.id, spec2.spectrum_id, cosine)) - spec2.edges.append((spec1.id, spec1.spectrum_id, cosine)) - else: - spec1.family_id = family - - -@deprecated(version="1.3.3", reason="Use the GNPSLoader class instead.") -def load_dataset( - strains, - mgf_file, - edges_file, - nodes_file, - quant_table_file=None, - metadata_table_file=None, - ext_metadata_parsing=False, -): - """Wrapper function instead of GNPSLoader. - Loads all data required for the metabolomics side. - - Args: - strains(StrainCollection): Object holding the strain collections - mgf_file(string): Path to spectra file in MGF format - edges_file(string): Path to GNPS edges.pairinfo file. - nodes_file(string): Path to GNPS networking output file in tsv format. - quant_table_file (string, optional): Path to quantification/annotation table. Defaults to None. - metadata_table_file (string, optional): Unknown. Defaults to None. - ext_metadata_parsing (bool, optional): Unknown. Defaults to False. - - Returns: - Tuple[dict, List[Spectrum], List[MolecularFamily], dict]: Loaded Metabolomics data. - """ - # common steps to all formats of GNPS data: - # - parse the MGF file to create a set of Spectrum objects - # - parse the edges file and update the spectra with that data - - # build a set of Spectrum objects by parsing the MGF file - spec_dict = load_spectra(mgf_file, edges_file) - - # spectra = GNPSSpectrumLoader(mgf_file).spectra() - # above returns a list, create a dict indexed by spectrum_id to make - # the rest of the parsing a bit simpler - # spec_dict = {spec.spectrum_id: spec for spec in spectra} - - # add edges info to the spectra - molfams = make_families(list(spec_dict.values())) - # molfams = GNPSMolecularFamilyLoader(edges_file).families() - - unknown_strains = load_gnps( - strains, nodes_file, quant_table_file, metadata_table_file, ext_metadata_parsing, spec_dict - ) - - return spec_dict, list(spec_dict.values()), molfams, unknown_strains - - -@deprecated(version="1.3.3", reason="Use the GNPSSpectrumLoader class instead.") -def load_spectra(mgf_file: str | PathLike, edges_file: str | PathLike) -> dict[str, Spectrum]: - """Wrapper function to load spectra and init the molecular family links. - - Args: - mgf_file(str | PathLike): File storing the mass spectra in MGF format. - edges_file(str | PathLike): File storing the molecular family information in .selfloop or .pairsinfo format. - - Returns: - dict[str, Spectrum]: Indexed dict of mass spectra. - """ - ms1, ms2, metadata = LoadMGF(name_field="scans").load_spectra([str(mgf_file)]) - logger.info("%d molecules parsed from MGF file", len(ms1)) - spectra = _mols_to_spectra( - ms2, metadata - ) # above returns a list, create a dict indexed by spectrum_id to make - # the rest of the parsing a bit simpler - spec_dict: dict[str, Spectrum] = {spec.spectrum_id: spec for spec in spectra} - load_edges(edges_file, spec_dict) - return spec_dict - - -@deprecated(version="1.3.3", reason="Use the GNPSMolecularFamilyLoader class instead.") -def make_families(spectra: list[Spectrum]) -> list[MolecularFamily]: - """Instantiate the MolecularFamily objects given the cluster id's added to the Spectra objects. - - Args: - spectra(list[Spectrum]): Spectra objects from which to read the cluster id's and put them into molecular families. - - Returns: - list[MolecularFamily]: Molecular families created from the id's present in the soectra. - """ - families = [] - family_dict = {} - family_index = 0 - fams, singles = 0, 0 - for spectrum in spectra: - family_id = spectrum.family_id - if family_id == "-1": # singleton - new_family = SingletonFamily() - new_family.id = family_index - family_index += 1 - - new_family.add_spectrum(spectrum) - spectrum.family = new_family - families.append(new_family) - singles += 1 - else: - if family_id not in family_dict: - new_family = MolecularFamily(family_id) - new_family.id = family_index - new_family.family_id = family_id # preserve original ID here - family_index += 1 - - new_family.add_spectrum(spectrum) - spectrum.family = new_family - families.append(new_family) - family_dict[family_id] = new_family - fams += 1 - else: - family_dict[family_id].add_spectrum(spectrum) - spectrum.family = family_dict[family_id] - - logger.debug("make_families: {} molams + {} singletons".format(fams, singles)) - return families diff --git a/tests/conftest.py b/tests/conftest.py index 736a5b3e..0a7efcfd 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,18 +1,8 @@ import pytest from nplinker.globals import STRAIN_MAPPINGS_FILENAME -from nplinker.metabolomics import Spectrum -from nplinker.metabolomics.metabolomics import load_spectra from nplinker.strain import Strain from nplinker.strain_collection import StrainCollection from . import DATA_DIR -from . import GNPS_DATA_DIR - - -@pytest.fixture -def spec_dict() -> dict[str, Spectrum]: - mgf_file = GNPS_DATA_DIR / "spectra.mgf" - edges_file = GNPS_DATA_DIR / "edges.pairsinfo" - return load_spectra(mgf_file, edges_file) @pytest.fixture diff --git a/tests/metabolomics/test_load_gnps.py b/tests/metabolomics/test_load_gnps.py deleted file mode 100644 index 880a893c..00000000 --- a/tests/metabolomics/test_load_gnps.py +++ /dev/null @@ -1,69 +0,0 @@ -from itertools import chain -import pytest -from nplinker.metabolomics.gnps import GNPSFormat -from nplinker.metabolomics.load_gnps import _load_clusterinfo_old -from nplinker.metabolomics.load_gnps import _messy_strain_naming_lookup -from nplinker.metabolomics.load_gnps import _parse_mzxml_header -from nplinker.metabolomics.load_gnps import load_gnps -from nplinker.strain_collection import StrainCollection -from nplinker.utils import get_headers -from .. import GNPS_DATA_DIR - - -nodes_file = GNPS_DATA_DIR / "nodes.tsv" -strains = StrainCollection() - - -def test_load_gnps(spec_dict): - unknown_strains = load_gnps(strains, nodes_file, None, None, None, spec_dict) - - assert len(unknown_strains) > 0 - - -def test_load_clusterinfo_old(spec_dict): - metadata_keys_before = set(chain(*[x.metadata.copy().keys() for x in spec_dict.values()])) - - assert "files" not in metadata_keys_before - assert "cluster_index" not in metadata_keys_before - - sut = _load_clusterinfo_old(GNPSFormat.SNETS, strains, nodes_file, spec_dict) - - metadata_keys_after = set(chain(*[x.metadata.keys() for x in spec_dict.values()])) - - assert len(metadata_keys_after) == len(metadata_keys_before) + 2 - - assert len(sut) > 0 - - for spectrum_id, spec in spec_dict.items(): - metadata = spec.metadata - assert len(metadata.get("files")) > 1 - assert isinstance(metadata.get("cluster_index"), str) - assert spectrum_id == metadata.get("cluster_index") - - -@pytest.mark.parametrize( - "messy_alias, expected", - [ - ["42b.mzXML", "42b.mzXML"], - ["blub", None], - ["42b.mzXML.copy", "42b.mzXML"], - ["Salinispora arenicola CNB527_blub", "42b.mzXML"], - ["GCF_000514775.1", "9b.mzXML"], - ], -) -def test_messy_strain_naming_lookup( - collection_from_file: StrainCollection, messy_alias: str, expected: str | None -): - actual = _messy_strain_naming_lookup(messy_alias, collection_from_file) - - if expected is not None: - assert actual == collection_from_file.lookup(expected) - else: - assert actual == expected - - -def test_parse_mzxml_header(): - headers = get_headers(str(GNPS_DATA_DIR / "nodes_fbmn.csv")) - hdr = headers[10] - actual = _parse_mzxml_header(hdr, StrainCollection(), None, None) - assert actual is not None diff --git a/tests/metabolomics/test_metabolomics.py b/tests/metabolomics/test_metabolomics.py deleted file mode 100644 index 94201cd8..00000000 --- a/tests/metabolomics/test_metabolomics.py +++ /dev/null @@ -1,25 +0,0 @@ -from nplinker.metabolomics.metabolomics import load_dataset -from nplinker.metabolomics.metabolomics import make_families -from nplinker.strain_collection import StrainCollection -from .. import GNPS_DATA_DIR - - -def test_load_spectra(spec_dict): - assert len(spec_dict.keys()) > 0 - - -def test_load_dataset(): - strains = StrainCollection() - mgf_file = GNPS_DATA_DIR / "spectra.mgf" - edges_file = GNPS_DATA_DIR / "edges.pairsinfo" - nodes_file = GNPS_DATA_DIR / "nodes.tsv" - - spec_dict, spectra, _, _ = load_dataset(strains, mgf_file, edges_file, nodes_file) - - assert isinstance(spec_dict, dict) - assert len(spectra) > 1 - - -def test_make_families(spec_dict): - families = make_families(spec_dict.values()) - assert len(families) == 25769