From a3f6bfb42d2f8da231d2467b7835acc4f9b94981 Mon Sep 17 00:00:00 2001 From: haeussma <83341109+haeussma@users.noreply.github.com> Date: Sun, 3 Mar 2024 12:43:19 +0100 Subject: [PATCH] moved parsers in own files --- chromatopy/readers/__init__.py | 3 +- chromatopy/readers/abstractreader.py | 263 +++------------------------ chromatopy/readers/chemstation.py | 16 ++ chromatopy/readers/csv.py | 16 ++ chromatopy/readers/shimadzu.py | 212 +++++++++++++++++++++ examples/kr/lc.ipynb | 4 +- specifications/chromatography.md | 5 +- 7 files changed, 280 insertions(+), 239 deletions(-) create mode 100644 chromatopy/readers/chemstation.py create mode 100644 chromatopy/readers/csv.py create mode 100644 chromatopy/readers/shimadzu.py diff --git a/chromatopy/readers/__init__.py b/chromatopy/readers/__init__.py index ccd1034..fbc8a24 100644 --- a/chromatopy/readers/__init__.py +++ b/chromatopy/readers/__init__.py @@ -1,2 +1 @@ -from .abstractreader import CSVReader -from .abstractreader import ShimadzuReader +from .shimadzu import ShimadzuReader diff --git a/chromatopy/readers/abstractreader.py b/chromatopy/readers/abstractreader.py index 65e49e5..00c56bd 100644 --- a/chromatopy/readers/abstractreader.py +++ b/chromatopy/readers/abstractreader.py @@ -1,20 +1,39 @@ from abc import ABC, abstractmethod -from datetime import datetime -from io import StringIO import os -import pathlib -from typing import Optional -import re - -import pandas as pd class AbstractReader(ABC): + """ + AbstractReader is an abstract base class that defines the interface for reading and processing + data from a file or directory. + + Attributes: + path (str): The path to the file or directory. + _is_directory (bool): Indicates whether the path is a directory or not. + + Methods: + __init__(self, path: str): Initializes the AbstractReader object with the specified path. + _validate_path(self): Validates the path to ensure it exists and is either a file or directory. + _paths(self): Returns a list of paths to be processed based on the type of the input path. + read(self): Abstract method to read the data from the file or directory. + read_file(self): Abstract method to read the data from a single file. + extract_peaks(self): Abstract method to extract peaks from the data. + extract_signal(self): Abstract method to extract the signal from the data. + """ def __init__(self, path: str): + self._validate_path() self.path = path self._is_directory: bool = os.path.isdir(path) + def _validate_path(self): + if not os.path.exists(self.path): + raise FileNotFoundError(f"Path '{self.path}' does not exist.") + if self._is_directory and not os.path.isdir(self.path): + raise NotADirectoryError(f"Path '{self.path}' is not a directory.") + if not self._is_directory and not os.path.isfile(self.path): + raise FileNotFoundError(f"Path '{self.path}' is not a file.") + def _paths(self): if self._is_directory: return [os.path.join(self.path, f) for f in os.listdir(self.path)] @@ -23,240 +42,16 @@ def _paths(self): @abstractmethod def read(self): - raise NotImplementedError + pass @abstractmethod def read_file(self): - raise NotImplementedError() - - @abstractmethod - def extract_peaks(self): - raise NotImplementedError() - - @abstractmethod - def extract_signal(self): - raise NotImplementedError() - - -class CSVReader(AbstractReader): - - def read(self): pass - def read_csv(self): - data = pd.read_csv(self.path, header=None) - return data - + @abstractmethod def extract_peaks(self): pass + @abstractmethod def extract_signal(self): pass - - -class ChemStationReader(AbstractReader): - - def read_file(self): - pass - - -class ShimadzuReader(AbstractReader): - RE_SECTION = re.compile(r"\[(.*)\]") - - def _paths(self): - if self._is_directory: - return [ - os.path.join(self.path, f) - for f in os.listdir(self.path) - if f.endswith(".txt") - ] - else: - return [self.path] - - def read(self): - return [self.read_file(f) for f in self._paths()] - - def read_file(self, path: str): - """ - Reads the contents of one or multiple files and returns them as a list of strings. - - Returns: - A list of strings, where each string represents the contents of a file. - """ - content = pathlib.Path(path).read_text(encoding="ISO-8859-1") - sections = self._parse_sections(content) - - measurement_dict = self._map_measurement(sections) - peak_dict = self.extract_peaks(sections) - chromatogram_dict = self.extract_signal(sections) - chromatogram_dict["peaks"] = peak_dict - measurement_dict["chromatograms"] = [chromatogram_dict] - - return measurement_dict - - def _map_measurement(self, sections: dict) -> dict: - header = self.get_header(sections) - timestamp = datetime.strptime( - f"{header['Output Date'].rstrip('.')} {header['Output Time']}", - "%d.%m.%Y %H:%M:%S", - ) - sample_info = self.get_sample_information(sections) - dilution_factor = sample_info.get("Dilution Factor", 1) - injection_volume = sample_info.get("Injection Volume", None) - try: - injection_volume = float(injection_volume) / float(dilution_factor) - except TypeError: - raise ValueError("Injection volume not found in sample information") - - return { - "timestamp": timestamp, - "injection_volume": injection_volume, - "injection_volume_unit": "µL", - } - - def extract_peaks(self, sections: dict): - table = self.get_peak_table(sections) - return self._map_peak_table(table) - - def extract_signal(self, sections) -> dict: - table = self.get_chromatogram_table(sections) - return self._map_chromatogram_table(table) - - def _parse_sections(self, file_content: str) -> dict: - """Parse a Shimadzu ASCII-export file into sections.""" - - # Split file into sections using section header pattern - section_splits = re.split(self.RE_SECTION, file_content) - if len(section_splits[0]) != 0: - raise IOError("The file should start with a section header") - - section_names = section_splits[1::2] - section_contents = [content for content in section_splits[2::2]] - - return dict(zip(section_names, section_contents)) - - def parse_meta(self, sections: dict, section_name: str, nrows: int) -> dict: - """Parse the metadata in a section as keys-values.""" - - meta_table = ( - pd.read_table( - StringIO(sections[section_name]), - nrows=nrows, - header=None, - sep=",", - ) - .set_index(0)[1] - .to_dict() - ) - - return meta_table - - def parse_table( - self, sections: dict, section_name: str, skiprows: int = 1 - ) -> Optional[pd.DataFrame]: - """Parse the data in a section as a table.""" - table_str = sections[section_name] - - # Count number of non-empty lines - num_lines = len(table_str.splitlines()) - - if num_lines <= 1: - return None - - return pd.read_table(StringIO(table_str), header=1, skiprows=skiprows, sep=",") - - def _map_peak_table(self, table: pd.DataFrame) -> dict: - retention_time_col = "R.Time" - height_col = "Height" - area_col = "Area" - peak_start_col = "I.Time" - peak_end_col = "F.Time" - tailing_col = "Tailing" - separation_col = "Sep.Factor" - peak_start_col = "I.Time" - peak_end_col = "F.Time" - - return table.apply( - lambda row: { - "retention_time": row[retention_time_col], - "retention_time_unit": "min", - "peak_start": row[peak_start_col], - "peak_end": row[peak_end_col], - "height": row[height_col], - "area": row[area_col], - "width": row[peak_end_col] - row[peak_start_col], - "width_unit": "min", - "tailing_factor": row[tailing_col], - "separation_factor": row[separation_col], - }, - axis=1, - ).tolist() - - def _map_chromatogram_table(self, table: pd.DataFrame) -> dict: - """ - Maps the chromatogram table to a dictionary format. - - Args: - table (pd.DataFrame): The chromatogram table. - - Returns: - dict: The mapped chromatogram dictionary. - """ - return { - "retention_times": table["R.Time (min)"].tolist(), - "signals": table["Value (mV)"].tolist(), - "time_unit": "min", - } - - def get_peak_table( - self, sections: dict, detector: str = "A-Ch1" - ) -> Optional[pd.DataFrame]: - section_name = f"Peak Table(Detector {detector})" - table = self.parse_table(sections, section_name, skiprows=1) - - return table - - def get_compound_table( - self, sections: dict, detector: str = "A" - ) -> Optional[pd.DataFrame]: - section_name = f"Compound Results(Detector {detector})" - meta = self.parse_meta(sections, section_name, 1) - table = self.parse_table(sections, section_name, skiprows=1) - - assert ( - table is None or int(meta["# of IDs"]) == table.shape[0] - ), "Declared number of compounds and table size differ" - - return table - - def get_chromatogram_table( - self, sections: dict, detector: str = "A", channel: int = 1 - ) -> Optional[pd.DataFrame]: - section_name = f"LC Chromatogram(Detector {detector}-Ch{channel})" - - meta = self.parse_meta(sections, section_name, 6) - table = self.parse_table(sections, section_name, skiprows=7) - - # Convert intensity values into what they are supposed to be - table["Value (mV)"] = table["Intensity"] * float(meta["Intensity Multiplier"]) - - assert ( - meta["Intensity Units"] == "mV" - ), f"Assumed intensity units in mV but got {meta['Intensity Units']}" - assert ( - int(meta["# of Points"]) == table.shape[0] - ), "Declared number of points and table size differ" - - return table - - def get_header(self, sections: dict) -> dict: - return self.parse_meta(sections, "Header", nrows=None) - - def get_file_information(self, sections: dict) -> dict: - return self.parse_meta(sections, "File Information", nrows=None) - - def get_original_files(self, sections: dict) -> dict: - return self.parse_meta(sections, "Original Files", nrows=None) - - def get_sample_information(self, sections: dict) -> dict: - return self.parse_meta(sections, "Sample Information", nrows=None) diff --git a/chromatopy/readers/chemstation.py b/chromatopy/readers/chemstation.py new file mode 100644 index 0000000..23924b7 --- /dev/null +++ b/chromatopy/readers/chemstation.py @@ -0,0 +1,16 @@ +from .abstractreader import AbstractReader + + +class ChemStationReader(AbstractReader): + + def read(self): + raise NotImplementedError + + def read_file(self): + raise NotImplementedError() + + def extract_peaks(self): + raise NotImplementedError() + + def extract_signal(self): + raise NotImplementedError() diff --git a/chromatopy/readers/csv.py b/chromatopy/readers/csv.py new file mode 100644 index 0000000..fa11a8d --- /dev/null +++ b/chromatopy/readers/csv.py @@ -0,0 +1,16 @@ +from .abstractreader import AbstractReader + + +class CSVReader(AbstractReader): + + def read(self): + raise NotImplementedError + + def read_file(self): + raise NotImplementedError() + + def extract_peaks(self): + raise NotImplementedError() + + def extract_signal(self): + raise NotImplementedError() diff --git a/chromatopy/readers/shimadzu.py b/chromatopy/readers/shimadzu.py new file mode 100644 index 0000000..825ab0d --- /dev/null +++ b/chromatopy/readers/shimadzu.py @@ -0,0 +1,212 @@ +from typing import Optional +import os +import re +import pathlib +from datetime import datetime +from io import StringIO +import pandas as pd + +from chromatopy.readers.abstractreader import AbstractReader + + +class ShimadzuReader(AbstractReader): + RE_SECTION = re.compile(r"\[(.*)\]") + + def _paths(self): + if self._is_directory: + return [ + os.path.join(self.path, f) + for f in os.listdir(self.path) + if f.endswith(".txt") + ] + else: + return [self.path] + + def read(self): + return [self.read_file(f) for f in self._paths()] + + def read_file(self, path: str): + """ + Reads the contents of one or multiple files and returns them as a list of strings. + + Returns: + A list of strings, where each string represents the contents of a file. + """ + content = pathlib.Path(path).read_text(encoding="ISO-8859-1") + sections = self._parse_sections(content) + + measurement_dict = self._map_measurement(sections) + peak_dict = self.extract_peaks(sections) + chromatogram_dict = self.extract_signal(sections) + chromatogram_dict["peaks"] = peak_dict + measurement_dict["chromatograms"] = [chromatogram_dict] + + return measurement_dict + + def _map_measurement(self, sections: dict) -> dict: + header = self.get_header(sections) + timestamp = datetime.strptime( + f"{header['Output Date'].rstrip('.')} {header['Output Time']}", + "%d.%m.%Y %H:%M:%S", + ) + sample_info = self.get_sample_information(sections) + dilution_factor = sample_info.get("Dilution Factor", 1) + injection_volume = sample_info.get("Injection Volume", None) + try: + injection_volume = float(injection_volume) / float(dilution_factor) + except TypeError: + raise ValueError("Injection volume not found in sample information") + + return { + "timestamp": timestamp, + "injection_volume": injection_volume, + "injection_volume_unit": "µL", + "type": "UV", + } + + def extract_peaks(self, sections: dict): + table = self.get_peak_table(sections) + return self._map_peak_table(table) + + def extract_signal(self, sections) -> dict: + table = self.get_chromatogram_table(sections) + return self._map_chromatogram_table(table) + + def _parse_sections(self, file_content: str) -> dict: + """Parse a Shimadzu ASCII-export file into sections.""" + + # Split file into sections using section header pattern + section_splits = re.split(self.RE_SECTION, file_content) + if len(section_splits[0]) != 0: + raise IOError("The file should start with a section header") + + section_names = section_splits[1::2] + section_contents = [content for content in section_splits[2::2]] + + return dict(zip(section_names, section_contents)) + + def parse_meta(self, sections: dict, section_name: str, nrows: int) -> dict: + """Parse the metadata in a section as keys-values.""" + + meta_table = ( + pd.read_table( + StringIO(sections[section_name]), + nrows=nrows, + header=None, + sep=",", + ) + .set_index(0)[1] + .to_dict() + ) + + return meta_table + + def parse_table( + self, sections: dict, section_name: str, skiprows: int = 1 + ) -> Optional[pd.DataFrame]: + """Parse the data in a section as a table.""" + table_str = sections[section_name] + + # Count number of non-empty lines + num_lines = len(table_str.splitlines()) + + if num_lines <= 1: + return None + + return pd.read_table(StringIO(table_str), header=1, skiprows=skiprows, sep=",") + + def _map_peak_table(self, table: pd.DataFrame) -> dict: + retention_time_col = "R.Time" + height_col = "Height" + area_col = "Area" + peak_start_col = "I.Time" + peak_end_col = "F.Time" + tailing_col = "Tailing" + separation_col = "Sep.Factor" + peak_start_col = "I.Time" + peak_end_col = "F.Time" + + return table.apply( + lambda row: { + "retention_time": row[retention_time_col], + "retention_time_unit": "min", + "peak_start": row[peak_start_col], + "peak_end": row[peak_end_col], + "height": row[height_col], + "area": row[area_col], + "width": row[peak_end_col] - row[peak_start_col], + "width_unit": "min", + "tailing_factor": row[tailing_col], + "separation_factor": row[separation_col], + }, + axis=1, + ).tolist() + + def _map_chromatogram_table(self, table: pd.DataFrame) -> dict: + """ + Maps the chromatogram table to a dictionary format. + + Args: + table (pd.DataFrame): The chromatogram table. + + Returns: + dict: The mapped chromatogram dictionary. + """ + return { + "retention_times": table["R.Time (min)"].tolist(), + "signals": table["Value (mV)"].tolist(), + "time_unit": "min", + } + + def get_peak_table( + self, sections: dict, detector: str = "A-Ch1" + ) -> Optional[pd.DataFrame]: + section_name = f"Peak Table(Detector {detector})" + table = self.parse_table(sections, section_name, skiprows=1) + + return table + + def get_compound_table( + self, sections: dict, detector: str = "A" + ) -> Optional[pd.DataFrame]: + section_name = f"Compound Results(Detector {detector})" + meta = self.parse_meta(sections, section_name, 1) + table = self.parse_table(sections, section_name, skiprows=1) + + assert ( + table is None or int(meta["# of IDs"]) == table.shape[0] + ), "Declared number of compounds and table size differ" + + return table + + def get_chromatogram_table( + self, sections: dict, detector: str = "A", channel: int = 1 + ) -> Optional[pd.DataFrame]: + section_name = f"LC Chromatogram(Detector {detector}-Ch{channel})" + + meta = self.parse_meta(sections, section_name, 6) + table = self.parse_table(sections, section_name, skiprows=7) + + # Convert intensity values into what they are supposed to be + table["Value (mV)"] = table["Intensity"] * float(meta["Intensity Multiplier"]) + + assert ( + meta["Intensity Units"] == "mV" + ), f"Assumed intensity units in mV but got {meta['Intensity Units']}" + assert ( + int(meta["# of Points"]) == table.shape[0] + ), "Declared number of points and table size differ" + + return table + + def get_header(self, sections: dict) -> dict: + return self.parse_meta(sections, "Header", nrows=None) + + def get_file_information(self, sections: dict) -> dict: + return self.parse_meta(sections, "File Information", nrows=None) + + def get_original_files(self, sections: dict) -> dict: + return self.parse_meta(sections, "Original Files", nrows=None) + + def get_sample_information(self, sections: dict) -> dict: + return self.parse_meta(sections, "Sample Information", nrows=None) diff --git a/examples/kr/lc.ipynb b/examples/kr/lc.ipynb index a1b7b97..2573e52 100644 --- a/examples/kr/lc.ipynb +++ b/examples/kr/lc.ipynb @@ -14,8 +14,8 @@ "outputs": [], "source": [ "from sdRDM.generator import generate_python_api\n", - "generate_python_api(\"../../specifications/chromatography.md\",\n", - " \"../../\", \"chromatopy\")" + "\n", + "generate_python_api(\"../../specifications/chromatography.md\", \"../../\", \"chromatopy\")" ] }, { diff --git a/specifications/chromatography.md b/specifications/chromatography.md index c0d4965..10b06e3 100644 --- a/specifications/chromatography.md +++ b/specifications/chromatography.md @@ -77,6 +77,9 @@ - Type: float - Description: Signal values - Multiple: True +- wavelength + - Type: float + - Description: Wavelength of the signal in nm - type - Type: SignalType - Description: Type of signal @@ -276,7 +279,7 @@ Describes properties of a column and its connections to the inlet and detector. - Description: Inlet of the column - detector - Type: Detector - - Description: Outlet of the column, connected to detector + - Description: Outlet of the column, connected to the detector - outlet_pressure - Type: float - Description: Outlet pressure of the column