Skip to content

Commit

Permalink
added chemstation
Browse files Browse the repository at this point in the history
  • Loading branch information
haeussma committed Mar 15, 2024
1 parent 4d3e374 commit 2bbfa32
Show file tree
Hide file tree
Showing 5 changed files with 6,782 additions and 152 deletions.
261 changes: 111 additions & 150 deletions chromatopy/readers/chemstation.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,28 @@
from .abstractreader import AbstractReader
from pathlib import Path
from datetime import datetime
import re

from chromatopy.readers.abstractreader import AbstractReader


class ChemStationReader(AbstractReader):
class ChemstationReader(AbstractReader):

def _paths(self):
if self._is_directory:
return [p for p in Path(self.path).rglob("Report.TXT")]
else:
return [self.path]

def read(self):
return [self.read_file(f) for f in self._paths()]
paths = [self.read_file(f) for f in self._paths()]

measurements = []
for path in self._paths():
file = self.read_file(path)

measurements.append(self.parse_measurement(file))

return measurements

def read_file(self, path: str) -> str:

Expand All @@ -21,180 +39,123 @@ def read_file(self, path: str) -> str:
except UnicodeError:
raise UnicodeError()

def extract_peaks(self):
raise NotImplementedError()

def extract_signal(self):
raise NotImplementedError()


"""
import re
import os
import pandas as pd
from datetime import datetime
from HPLC.core.hplcexperiment import HPLCExperiment
from HPLC.core.measurement import Measurement
from HPLC.core.signal import Signal
from HPLC.core.signaltype import SignalType
from HPLC.core.method import Method
def _read_file(path: str):
try:
with open(path, encoding="utf-16") as f:
return f.readlines()
except UnicodeError:
pass
try:
with open(path) as f:
lines = f.readlines()
return [line.strip() for line in lines]
except UnicodeError:
raise UnicodeError()
def parse_measurement(self, file: str):
from chromatopy.core import Measurement, Peak, Chromatogram, SignalType

INJ_VOLUME = re.compile(r"(\d+\s+(µ?[a-zA-Z]?l))")
TIMESTAMP = re.compile(
r"\d{1,2}\/\d{1,2}\/\d{2,4} \d{1,2}:\d{2}:\d{2} (?:AM|PM)"
)
SIGNAL = re.compile(r"\bSignal\b \d+:")
PEAK = re.compile(r"^ +\d+")

def parse_method(path: str) -> Method:
SECTION_START = re.compile("^(?![\d\s])[\dA-Z\s]+$")
lines = _read_file(path)
method = Method()
section_slices = []
section_started = False
for line_id, line in enumerate(lines):
if SECTION_START.search(line):
section_started = True
section_start = line_id
if line == "" and section_started:
secion_end = line_id
section_slices.append(slice(section_start, secion_end))
section_started = False
return section_slices
def _get_peak(line: str) -> dict:
attr_slice_dict = {
"id": (slice(0, 4), str),
"retention_time": (slice(5, 12), float),
"type": (slice(13, 17), str),
"width": (slice(18, 25), float),
"area": (slice(26, 36), float),
"height": (slice(37, 47), float),
"percent_area": (slice(48, 56), float),
}
peak = {}
for key, (attr_slice, attr_type) in attr_slice_dict.items():
peak[key] = attr_type(line[attr_slice].strip())
return peak
def _get_peak_units(line: str) -> dict:
unit_slice_dict = {
"retention_time_unit": slice(5, 12),
"width_unit": slice(18, 25),
"area_unit": slice(26, 36),
"height_unit": slice(37, 47),
}
units = {}
for key, unit_slice in unit_slice_dict.items():
units[key] = line[unit_slice].strip().strip("[]")
measurement = Measurement()

return units
signal_slices = []
for line_count, line in enumerate(file):
if INJ_VOLUME.search(line):
injection_volume, volume_unit = INJ_VOLUME.search(line)[0].split()
measurement.injection_volume = float(injection_volume)
measurement.injection_volume_unit = volume_unit

if line.startswith("Injection Date"):
date_str = TIMESTAMP.search(line)[0]
timestamp = datetime.strptime(date_str, "%m/%d/%Y %I:%M:%S %p")
measurement.timestamp = timestamp

def parse_measurement(path: str) -> Measurement:
# Identify slices which describe signal blocks
if SIGNAL.search(line) and file[line_count + 1] == "\n":
signal_start = line_count
if line.startswith("Totals :"):
signal_end = line_count
signal_slices.append(slice(signal_start, signal_end))

INJ_VOLUME = re.compile("(\d+\s+(µ?[a-zA-Z]?l))")
TIMESTAMP = re.compile("\d{1,2}\/\d{1,2}\/\d{2,4} \d{1,2}:\d{2}:\d{2} (?:AM|PM)")
SIGNAL = re.compile(r"\bSignal\b \d+:")
PEAK = re.compile("^ +\d+")
# Parse peak data for each signal type
for signal_slice in signal_slices:

lines = _read_file(path)
signal = Chromatogram()

measurement = Measurement()
for line in file[signal_slice]:

signal_slices = []
for line_count, line in enumerate(lines):
if INJ_VOLUME.search(line):
injection_volume, volume_unit = INJ_VOLUME.search(line)[0].split()
measurement.injection_volume = float(injection_volume)
measurement.injection_volume_unit = volume_unit
if line.startswith("Signal"):
signal_type = line.split(":")[1].split()[0]
signal_type = re.findall("[A-Za-z]+", signal_type)[0]
signal.type = SignalType[signal_type]
continue

if line.startswith("Injection Date"):
date_str = TIMESTAMP.search(line)[0]
timestamp = datetime.strptime(date_str, "%m/%d/%Y %I:%M:%S %p")
measurement.timestamp = timestamp
if line.startswith(" # "):
peak_units = self._get_peak_units(line)
continue

# Identify slices which describe signal blocks
if SIGNAL.search(line) and lines[line_count + 1] == "\n":
signal_start = line_count
if line.startswith("Totals :"):
signal_end = line_count
signal_slices.append(slice(signal_start, signal_end))
if PEAK.search(line):
peak_values = self._get_peak(line)

# Parse peak data for each signal type
for signal_slice in signal_slices:
signal.add_to_peaks(**(peak_values | peak_units))

signal = Signal()
measurement.chromatograms.append(signal)

for line in lines[signal_slice]:
return measurement

if line.startswith("Signal"):
signal_type = line.split(":")[1].split()[0]
signal_type = re.findall("[A-Za-z]+", signal_type)[0]
signal.type = signal_type.lower()
continue
def _get_peak(self, line: str) -> dict:

if line.startswith(" # "):
peak_units = _get_peak_units(line)
continue
attr_slice_dict = {
"id": (slice(0, 4), str),
"retention_time": (slice(5, 12), float),
"type": (slice(13, 17), str),
"width": (slice(18, 25), float),
"area": (slice(26, 36), float),
"height": (slice(37, 47), float),
"percent_area": (slice(48, 56), float),
}

if PEAK.search(line):
peak_values = _get_peak(line)
peak = {}
for key, (attr_slice, attr_type) in attr_slice_dict.items():
peak[key] = attr_type(line[attr_slice].strip())

signal.add_to_peaks(**(peak_values | peak_units))
return peak

measurement.signals.append(signal)
def _get_peak_units(self, line: str) -> dict:

return measurement
unit_slice_dict = {
"retention_time_unit": slice(5, 12),
"width_unit": slice(18, 25),
"area_unit": slice(26, 36),
"height_unit": slice(37, 47),
}

units = {}
for key, unit_slice in unit_slice_dict.items():
units[key] = line[unit_slice].strip().strip("[]")

def parse_experiment(path: str) -> HPLCExperiment:
return units

peak_file_name = "Report.TXT"
def parse_method(self, file: str):

experiment = HPLCExperiment()
SECTION_START = re.compile(r"^(?![\d\s])[\dA-Z\s]+$")

for dir in sorted(os.listdir(path)):
if dir.endswith(".D"):
measurement_path = os.path.join(path, dir)
for file in os.listdir(measurement_path):
if file == peak_file_name:
measurement = parse_measurement(
os.path.join(measurement_path, file)
)
section_slices = []
section_started = False
for line_id, line in enumerate(file):
if SECTION_START.search(line):
section_started = True
section_start = line_id
if line == "" and section_started:
secion_end = line_id
section_slices.append(slice(section_start, secion_end))
section_started = False

experiment.measurements.append(measurement)
return section_slices

return experiment
def extract_peaks(self):
raise NotImplementedError()

def extract_signal(self):
raise NotImplementedError()

def get_peak(signal_type: SignalType, peak_id: int):

if signal_type not in SignalType.__members__:
raise TypeError(
f"signal_type must be one of {[s_type.value for s_type in SignalType]}"
)
"""
if __name__ == "__main__":
dir_path = "/Users/max/Documents/training_course/hao"
cs = ChemstationReader(dir_path)
paths = cs._paths()
res = cs.parse_measurement(paths[0])
print(res)
2 changes: 1 addition & 1 deletion chromatopy/readers/csv.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import os
from datetime import datetime

from .abstractreader import AbstractReader
from chromatopy.readers.abstractreader import AbstractReader


class CSVReader(AbstractReader):
Expand Down
4 changes: 3 additions & 1 deletion chromatopy/readers/shimadzu.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ def read(self):
return [self.read_file(f) for f in self._paths()]

def read_file(self, path: str):
from chromatopy.core import Measurement

"""
Reads the contents of one or multiple files and returns them as a list of strings.
Expand All @@ -43,7 +45,7 @@ def read_file(self, path: str):
chromatogram_dict["peaks"] = peak_dict
measurement_dict["chromatograms"] = [chromatogram_dict]

return measurement_dict
return Measurement(**measurement_dict)

def _get_content(self, path: str) -> str:
return pathlib.Path(path).read_text(encoding="ISO-8859-1")
Expand Down
Loading

0 comments on commit 2bbfa32

Please sign in to comment.