diff --git a/src/ingest_validation_tests/fastq_validator.py b/src/ingest_validation_tests/fastq_validator.py index b30ca75..d9acee4 100644 --- a/src/ingest_validation_tests/fastq_validator.py +++ b/src/ingest_validation_tests/fastq_validator.py @@ -1,4 +1,3 @@ -from os import cpu_count from typing import List, Optional from fastq_validator_logic import FASTQValidatorLogic, _log @@ -11,10 +10,10 @@ class FASTQValidator(Validator): version = "1.0" def collect_errors(self, **kwargs) -> List[Optional[str]]: - threads = kwargs.get("coreuse", None) or cpu_count() // 4 or 1 - _log(f"Threading at FastQValidator with {threads}") + del kwargs + _log(f"Threading at FastQValidator with {self.thread_count}") validator = FASTQValidatorLogic(verbose=True) - validator.validate_fastq_files_in_path(self.paths, threads) + validator.validate_fastq_files_in_path(self.paths, self.thread_count) if validator.errors: return validator.errors elif validator.files_were_found: diff --git a/src/ingest_validation_tests/fastq_validator_logic.py b/src/ingest_validation_tests/fastq_validator_logic.py index baf8e45..f5c4699 100644 --- a/src/ingest_validation_tests/fastq_validator_logic.py +++ b/src/ingest_validation_tests/fastq_validator_logic.py @@ -2,32 +2,51 @@ import gzip import logging import re -from collections import defaultdict -from multiprocessing import Lock, Manager, Pool +from collections import defaultdict, namedtuple +from multiprocessing import Manager, Pool +from multiprocessing.managers import ListProxy +from os import cpu_count from pathlib import Path -from typing import Callable, List, Optional, TextIO +from typing import Callable, List, Optional, TextIO, Union import fastq_utils +from typing_extensions import Self + +filename_pattern = namedtuple("filename_pattern", ["prefix", "read_type", "set_num"]) def is_valid_filename(filename: str) -> bool: return bool(fastq_utils.FASTQ_PATTERN.fullmatch(filename)) +def get_prefix_read_type_and_set(filename: str) -> Optional[filename_pattern]: + if not bool(fastq_utils.FASTQ_PATTERN.fullmatch(filename)): + return + pattern = re.compile( + r"(?P.*(?=[_](?:(?PR|.read|I)\d)))(?:_(?P=read_type)\d_)(?P\d+)" + ) + groups = pattern.match(filename) + if groups and all(x in groups.groupdict().keys() for x in ["prefix", "read_type", "set"]): + return filename_pattern( + groups.group("prefix"), groups.group("read_type"), groups.group("set") + ) + + def _open_fastq_file(file: Path) -> TextIO: return gzip.open(file, "rt") if file.name.endswith(".gz") else file.open() -def _log(message: str) -> str: - print(message) - return message +def _log(message: str, verbose: bool = True) -> Optional[str]: + if verbose: + print(message) + return message class Engine(object): def __init__(self, validate_object): self.validate_object = validate_object - def __call__(self, fastq_file): + def __call__(self, fastq_file) -> list[Optional[str]]: errors = [] _log(f"Validating matching fastq file {fastq_file}") self.validate_object.validate_fastq_file(fastq_file) @@ -56,8 +75,9 @@ class FASTQValidatorLogic: def __init__(self, verbose=False): self.errors: List[Optional[str]] = [] self.files_were_found = False + self.file_list = Manager().list() self._file_record_counts = Manager().dict() - self._file_prefix_counts = Manager().dict() + self._ungrouped_files = Manager().list() self._filename = "" self._line_number = 0 @@ -73,8 +93,7 @@ def _format_error(self, error: str) -> str: message = f"{location}: {error}" - if self._verbose: - _log(message) + print(message) return message def _validate_fastq_line_1(self, line: str) -> List[str]: @@ -113,6 +132,16 @@ def _validate_fastq_line_4(self, line: str) -> List[str]: ) return errors + def _make_groups(self) -> dict[filename_pattern, list[Path]]: + groups = defaultdict(list) + for file in self.file_list: + potential_match = get_prefix_read_type_and_set(file.name) + if potential_match: + groups[potential_match].append(file) + else: + self._ungrouped_files.append(file) + return groups + _VALIDATE_FASTQ_LINE_METHODS = { 1: _validate_fastq_line_1, 2: _validate_fastq_line_2, @@ -123,9 +152,9 @@ def _validate_fastq_line_4(self, line: str) -> List[str]: def validate_fastq_record(self, line: str, line_number: int) -> List[str]: line_index = line_number % 4 + 1 - validator_method: Callable[[FASTQValidatorLogic, str], List[str]] = ( - self._VALIDATE_FASTQ_LINE_METHODS[line_index] - ) + validator_method: Callable[[Self, str], List[str]] = self._VALIDATE_FASTQ_LINE_METHODS[ + line_index + ] assert validator_method, f"No validator method defined for record index {line_index}" @@ -179,97 +208,89 @@ def validate_fastq_file(self, fastq_file: Path) -> None: self._file_record_counts[str(fastq_file)] = records_read def validate_fastq_files_in_path(self, paths: List[Path], threads: int) -> None: - data_found_one = [] - dirs_and_files = defaultdict(dict) + """ + - Builds a list of filepaths; checks for duplicate filenames in upload. + - [parallel] Opens, validates, and gets line count of each file in list, and then + populates self._file_record_counts as {filepath: record_count}. + - If successful, groups files with matching prefix/read_type/set_num values. + - Compares record_counts across grouped files, logs any that don't match or are ungrouped. + """ for path in paths: - dirs_and_files[path] = fastq_utils.collect_fastq_files_by_directory(path) - _log( - f"FASTQValidatorLogic: Added files from {path} to dirs_and_files: {dirs_and_files}" - ) - file_list = [] + fastq_utils_output = fastq_utils.collect_fastq_files_by_directory(path) + for files in fastq_utils_output.values(): + self.file_list.extend(files) + _log(f"FASTQValidatorLogic: Added files from {path} to file_list: {files}") + self.files_were_found = bool(self.file_list) + self._find_duplicates() + _log(f"File list: {self.file_list}") with Manager() as manager: - # TODO: re-evaluate dicts/for loops lock = manager.Lock() - for path, rel_paths in dirs_and_files.items(): - for rel_path, files in rel_paths.items(): - for file in files: - file_list.append(Path(path / rel_path / file)) - if file_list: - self.files_were_found = True + pool = Pool(threads) try: logging.info( - f"Passing file list for paths {paths} to engine. File list: {file_list}." + f"Passing file list for paths {self._printable_filenames(paths, newlines=False)} to engine. File list:" ) - pool = Pool(threads) + logging.info(self._printable_filenames(self.file_list)) engine = Engine(self) - data_output = pool.imap_unordered(engine, file_list) + data_output = pool.imap_unordered(engine, self.file_list) + [self.errors.extend(output) for output in data_output if output] except Exception as e: - _log(f"Error {e}") pool.close() pool.join() - data_found_one.append(f"Error {e}") + _log(f"Error {e}") + self.errors.append(f"Error {e}") else: pool.close() pool.join() - self._find_duplicates(dirs_and_files) - self._find_shared_prefixes(lock) - [data_found_one.extend(output) for output in data_output if output] - - if len(data_found_one) > 0: - self.errors.extend(data_found_one) - - def _find_duplicates(self, dirs_and_files): - # TODO: re-evaluate dicts/for loops - for data_path, sub_dirs in dirs_and_files.items(): - # Creates a dict of filenames to list of full filepaths for each - # fastq file in a given data_path (dataset dir). - files_per_path = defaultdict(list) - for sub_path, filepaths in sub_dirs.items(): - for filepath in filepaths: - files_per_path[filepath.name].append(data_path / sub_path) - for filename, filepaths in files_per_path.items(): - if len(filepaths) > 1: + groups = self._make_groups() + if self._ungrouped_files: + _log(f"Ungrouped files, counts not checked: {self._ungrouped_files}") + self._find_counts(groups, lock) + + def _find_duplicates(self) -> None: + """ + Transforms data from {path: [filepaths]} to {filepath.name: [paths]} + to ensure that each filename only appears once in an upload + """ + files_per_path = defaultdict(list) + for filepath in self.file_list: + files_per_path[filepath.name].append(filepath.parents[0]) + for filename, filepaths in files_per_path.items(): + if len(filepaths) > 1: + self.errors.append( + _log( + f"{filename} has been found multiple times during this validation. " + f"Locations of duplicates: {str(filepaths)}." + ) + ) + + def _find_counts(self, groups: dict[filename_pattern, list[Path]], lock): + with lock: + for pattern, paths in groups.items(): + comparison = {} + for path in paths: + comparison[path] = self._file_record_counts.get(str(path)) + if not (len(set(comparison.values())) == 1): self.errors.append( - _log( - f"{filename} has been found multiple times during this validation. " - f"Locations of duplicates: {filepaths}." # noqa: E501 - ) + f"Counts do not match among files matching pattern {pattern.prefix}_{pattern.read_type}#_{pattern.set_num}: {comparison}" + ) + else: + _log( + f"PASSED: Record count comparison for files matching pattern {pattern.prefix}_{pattern.read_type}#_{pattern.set_num}: {comparison}" ) - def _find_shared_prefixes(self, lock): - # This pattern seeks out the string that includes the lane number (since - # that is expected to be present to help anchor the prefix) that comes - # before any of _I1, _I2, _R1, or _R2. - fastq_file_prefix_regex = re.compile(r"(.+_L\d+.*)_[IR][12][._]") - for fastq_file, records_read in self._file_record_counts.items(): - match = fastq_file_prefix_regex.match(Path(fastq_file).name) - with lock: - if match: - filename_prefix = match.group(1) - if filename_prefix in self._file_prefix_counts.keys(): - extant_count = self._file_prefix_counts[filename_prefix] - if extant_count != records_read: - # Find a file we've validated already that matches this - # prefix. - extant_files = [ - str(Path(filepath).name) - for filepath, record_count in self._file_record_counts.items() - if record_count == extant_count - and Path(filepath).name.startswith(filename_prefix) - ] - # Based on how the dictionaries are created, there should - # always be at least one matching filename. - assert extant_files - - self.errors.append( - _log( - f"{Path(fastq_file).name} ({records_read} lines) " - f"does not match length of {extant_files[0]} " - f"({extant_count} lines)." - ) - ) - else: - self._file_prefix_counts[filename_prefix] = records_read + def _printable_filenames( + self, files: Union[list, ListProxy, Path, str], newlines: bool = True + ): + if type(files) is list or type(files) is ListProxy: + file_list = [str(file) for file in files] + if newlines: + return "\n".join(file_list) + return file_list + elif type(files) is Path: + return str(files) + elif type(files) is str: + return files def main(): @@ -277,6 +298,7 @@ def main(): parser.add_argument( "filepaths", type=Path, nargs="+", help="Files to validate for FASTQ syntax" ) + parser.add_argument("coreuse", type=int, help="Number of cores to use") args = parser.parse_args() if isinstance(args.filepaths, List): @@ -289,7 +311,10 @@ def main(): raise Exception(f"Validator init received base_paths arg as type {type(args.filepaths)}") validator = FASTQValidatorLogic(True) - validator.validate_fastq_files_in_path(filepaths, Lock()) + if not (threads := args.coreuse): + cpus = cpu_count() + threads = cpus // 4 if cpus else 1 + validator.validate_fastq_files_in_path(filepaths, threads) if __name__ == "__main__": diff --git a/tests/test_fastq_validator_logic.py b/tests/test_fastq_validator_logic.py index 31bd68a..2a3e131 100644 --- a/tests/test_fastq_validator_logic.py +++ b/tests/test_fastq_validator_logic.py @@ -202,3 +202,94 @@ def test_fastq_validator_record_counts_bad(self, fastq_validator, tmp_path): assert "(4 lines)" in fastq_validator.errors[0] assert "does not match" in fastq_validator.errors[0] assert "(8 lines)" in fastq_validator.errors[0] + + def test_fastq_comparison_good(self, fastq_validator, tmp_path): + filenames = [ + "3252_ftL_RNA_T1_S31_L003_R1_001.fastq", + "3252_ftL_RNA_T1_S31_L003_R2_001.fastq", + "3252_ftL_RNA_T1_S31_L003_R1_002.fastq", + "3252_ftL_RNA_T1_S31_L003_R2_002.fastq", + ] + for filename in filenames: + new_file = tmp_path.joinpath(filename) + with _open_output_file(new_file, False) as output: + output.write(_GOOD_RECORDS) + + fastq_validator.validate_fastq_files_in_path([tmp_path], 2) + + assert not fastq_validator.errors + + # def test_fastq_comparison_bad_extra_R(self, fastq_validator, tmp_path): + # filenames = [ + # "3252_ftL_RNA_T1_S31_L003_R1_001.fastq", + # "3252_ftL_RNA_T1_S31_L003_R2_001.fastq", + # "3252_ftL_RNA_T1_S31_L003_R1_002.fastq", + # "3252_ftL_RNA_T1_S31_L003_R2_002.fastq", + # "3252_ftL_RNA_T1_S31_L003_R3_001.fastq", + # ] + # for filename in filenames: + # new_file = tmp_path.joinpath(filename) + # with _open_output_file(new_file, False) as output: + # output.write(_GOOD_RECORDS) + # + # fastq_validator.validate_fastq_files_in_path([tmp_path], 2) + # assert "IndexError: list index out of range" in fastq_validator.errors[0] + + # def test_fastq_comparison_bad_unpaired_R(self, fastq_validator, tmp_path): + # filenames = [ + # "3252_ftL_RNA_T1_S31_L003_R1_001.fastq", + # "3252_ftL_RNA_T1_S31_L003_R2_001.fastq", + # "3252_ftL_RNA_T1_S31_L003_R1_002.fastq", + # ] + # for filename in filenames: + # new_file = tmp_path.joinpath(filename) + # with _open_output_file(new_file, False) as output: + # output.write(_GOOD_RECORDS) + # + # fastq_validator.validate_fastq_files_in_path([tmp_path], 2) + # + # assert fastq_validator.errors + # + # def test_fastq_comparison_bad_mixed_I_and_R(self, fastq_validator, tmp_path): + # filenames = [ + # "3252_ftL_RNA_T1_S31_L003_R1_001.fastq", + # "3252_ftL_RNA_T1_S31_L003_R2_001.fastq", + # "3252_ftL_RNA_T1_S31_L003_I1_002.fastq", + # ] + # for filename in filenames: + # new_file = tmp_path.joinpath(filename) + # with _open_output_file(new_file, False) as output: + # output.write(_GOOD_RECORDS) + # + # fastq_validator.validate_fastq_files_in_path([tmp_path], 2) + # + # assert fastq_validator.errors + # + # def test_fastq_comparison_bad_extra_unmatched_fastq(self, fastq_validator, tmp_path): + # filenames = [ + # "3252_ftL_RNA_T1_S31_L003_R1_001.fastq", + # "3252_ftL_RNA_T1_S31_L003_R2_001.fastq", + # "bad_ftL_RNA_T1_S31_L003_R1_001.fastq", + # ] + # for filename in filenames: + # new_file = tmp_path.joinpath(filename) + # with _open_output_file(new_file, False) as output: + # output.write(_GOOD_RECORDS) + # + # fastq_validator.validate_fastq_files_in_path([tmp_path], 2) + # + # assert fastq_validator.errors + + def test_fastq_comparison_bad_unequal_line_counts(self, fastq_validator, tmp_path): + good_file = "3252_ftL_RNA_T1_S31_L003_R1_001.fastq" + bad_file = "3252_ftL_RNA_T1_S31_L003_R2_001.fastq" + new_good_file = tmp_path.joinpath(good_file) + with _open_output_file(new_good_file, False) as output: + output.write(_GOOD_RECORDS) + new_bad_file = tmp_path.joinpath(bad_file) + with _open_output_file(new_bad_file, False) as output: + output.write("bad") + + fastq_validator.validate_fastq_files_in_path([tmp_path], 2) + + assert fastq_validator.errors