Skip to content

Commit

Permalink
functionality seems fixed, need to fix verbosity and tests
Browse files Browse the repository at this point in the history
  • Loading branch information
gesinaphillips committed Oct 16, 2024
1 parent 4a196e7 commit 931714e
Show file tree
Hide file tree
Showing 3 changed files with 208 additions and 93 deletions.
7 changes: 3 additions & 4 deletions src/ingest_validation_tests/fastq_validator.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
from os import cpu_count
from typing import List, Optional

from fastq_validator_logic import FASTQValidatorLogic, _log
Expand All @@ -11,10 +10,10 @@ class FASTQValidator(Validator):
version = "1.0"

def collect_errors(self, **kwargs) -> List[Optional[str]]:
threads = kwargs.get("coreuse", None) or cpu_count() // 4 or 1
_log(f"Threading at FastQValidator with {threads}")
del kwargs
_log(f"Threading at FastQValidator with {self.thread_count}")
validator = FASTQValidatorLogic(verbose=True)
validator.validate_fastq_files_in_path(self.paths, threads)
validator.validate_fastq_files_in_path(self.paths, self.thread_count)
if validator.errors:
return validator.errors
elif validator.files_were_found:
Expand Down
203 changes: 114 additions & 89 deletions src/ingest_validation_tests/fastq_validator_logic.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,32 +2,51 @@
import gzip
import logging
import re
from collections import defaultdict
from multiprocessing import Lock, Manager, Pool
from collections import defaultdict, namedtuple
from multiprocessing import Manager, Pool
from multiprocessing.managers import ListProxy
from os import cpu_count
from pathlib import Path
from typing import Callable, List, Optional, TextIO
from typing import Callable, List, Optional, TextIO, Union

import fastq_utils
from typing_extensions import Self

filename_pattern = namedtuple("filename_pattern", ["prefix", "read_type", "set_num"])


def is_valid_filename(filename: str) -> bool:
return bool(fastq_utils.FASTQ_PATTERN.fullmatch(filename))


def get_prefix_read_type_and_set(filename: str) -> Optional[filename_pattern]:
if not bool(fastq_utils.FASTQ_PATTERN.fullmatch(filename)):
return
pattern = re.compile(
r"(?P<prefix>.*(?=[_](?:(?P<read_type>R|.read|I)\d)))(?:_(?P=read_type)\d_)(?P<set>\d+)"
)
groups = pattern.match(filename)
if groups and all(x in groups.groupdict().keys() for x in ["prefix", "read_type", "set"]):
return filename_pattern(
groups.group("prefix"), groups.group("read_type"), groups.group("set")
)


def _open_fastq_file(file: Path) -> TextIO:
return gzip.open(file, "rt") if file.name.endswith(".gz") else file.open()


def _log(message: str) -> str:
print(message)
return message
def _log(message: str, verbose: bool = True) -> Optional[str]:
if verbose:
print(message)
return message


class Engine(object):
def __init__(self, validate_object):
self.validate_object = validate_object

def __call__(self, fastq_file):
def __call__(self, fastq_file) -> list[Optional[str]]:
errors = []
_log(f"Validating matching fastq file {fastq_file}")
self.validate_object.validate_fastq_file(fastq_file)
Expand Down Expand Up @@ -56,8 +75,9 @@ class FASTQValidatorLogic:
def __init__(self, verbose=False):
self.errors: List[Optional[str]] = []
self.files_were_found = False
self.file_list = Manager().list()
self._file_record_counts = Manager().dict()
self._file_prefix_counts = Manager().dict()
self._ungrouped_files = Manager().list()
self._filename = ""
self._line_number = 0

Expand All @@ -73,8 +93,7 @@ def _format_error(self, error: str) -> str:

message = f"{location}: {error}"

if self._verbose:
_log(message)
print(message)
return message

def _validate_fastq_line_1(self, line: str) -> List[str]:
Expand Down Expand Up @@ -113,6 +132,16 @@ def _validate_fastq_line_4(self, line: str) -> List[str]:
)
return errors

def _make_groups(self) -> dict[filename_pattern, list[Path]]:
groups = defaultdict(list)
for file in self.file_list:
potential_match = get_prefix_read_type_and_set(file.name)
if potential_match:
groups[potential_match].append(file)
else:
self._ungrouped_files.append(file)
return groups

_VALIDATE_FASTQ_LINE_METHODS = {
1: _validate_fastq_line_1,
2: _validate_fastq_line_2,
Expand All @@ -123,9 +152,9 @@ def _validate_fastq_line_4(self, line: str) -> List[str]:
def validate_fastq_record(self, line: str, line_number: int) -> List[str]:
line_index = line_number % 4 + 1

validator_method: Callable[[FASTQValidatorLogic, str], List[str]] = (
self._VALIDATE_FASTQ_LINE_METHODS[line_index]
)
validator_method: Callable[[Self, str], List[str]] = self._VALIDATE_FASTQ_LINE_METHODS[
line_index
]

assert validator_method, f"No validator method defined for record index {line_index}"

Expand Down Expand Up @@ -179,104 +208,97 @@ def validate_fastq_file(self, fastq_file: Path) -> None:
self._file_record_counts[str(fastq_file)] = records_read

def validate_fastq_files_in_path(self, paths: List[Path], threads: int) -> None:
data_found_one = []
dirs_and_files = defaultdict(dict)
"""
- Builds a list of filepaths; checks for duplicate filenames in upload.
- [parallel] Opens, validates, and gets line count of each file in list, and then
populates self._file_record_counts as {filepath: record_count}.
- If successful, groups files with matching prefix/read_type/set_num values.
- Compares record_counts across grouped files, logs any that don't match or are ungrouped.
"""
for path in paths:
dirs_and_files[path] = fastq_utils.collect_fastq_files_by_directory(path)
_log(
f"FASTQValidatorLogic: Added files from {path} to dirs_and_files: {dirs_and_files}"
)
file_list = []
fastq_utils_output = fastq_utils.collect_fastq_files_by_directory(path)
for files in fastq_utils_output.values():
self.file_list.extend(files)
_log(f"FASTQValidatorLogic: Added files from {path} to file_list: {files}")
self.files_were_found = bool(self.file_list)
self._find_duplicates()
_log(f"File list: {self.file_list}")
with Manager() as manager:
# TODO: re-evaluate dicts/for loops
lock = manager.Lock()
for path, rel_paths in dirs_and_files.items():
for rel_path, files in rel_paths.items():
for file in files:
file_list.append(Path(path / rel_path / file))
if file_list:
self.files_were_found = True
pool = Pool(threads)
try:
logging.info(
f"Passing file list for paths {paths} to engine. File list: {file_list}."
f"Passing file list for paths {self._printable_filenames(paths, newlines=False)} to engine. File list:"
)
pool = Pool(threads)
logging.info(self._printable_filenames(self.file_list))
engine = Engine(self)
data_output = pool.imap_unordered(engine, file_list)
data_output = pool.imap_unordered(engine, self.file_list)
[self.errors.extend(output) for output in data_output if output]
except Exception as e:
_log(f"Error {e}")
pool.close()
pool.join()
data_found_one.append(f"Error {e}")
_log(f"Error {e}")
self.errors.append(f"Error {e}")
else:
pool.close()
pool.join()
self._find_duplicates(dirs_and_files)
self._find_shared_prefixes(lock)
[data_found_one.extend(output) for output in data_output if output]

if len(data_found_one) > 0:
self.errors.extend(data_found_one)

def _find_duplicates(self, dirs_and_files):
# TODO: re-evaluate dicts/for loops
for data_path, sub_dirs in dirs_and_files.items():
# Creates a dict of filenames to list of full filepaths for each
# fastq file in a given data_path (dataset dir).
files_per_path = defaultdict(list)
for sub_path, filepaths in sub_dirs.items():
for filepath in filepaths:
files_per_path[filepath.name].append(data_path / sub_path)
for filename, filepaths in files_per_path.items():
if len(filepaths) > 1:
groups = self._make_groups()
if self._ungrouped_files:
_log(f"Ungrouped files, counts not checked: {self._ungrouped_files}")
self._find_counts(groups, lock)

def _find_duplicates(self) -> None:
"""
Transforms data from {path: [filepaths]} to {filepath.name: [paths]}
to ensure that each filename only appears once in an upload
"""
files_per_path = defaultdict(list)
for filepath in self.file_list:
files_per_path[filepath.name].append(filepath.parents[0])
for filename, filepaths in files_per_path.items():
if len(filepaths) > 1:
self.errors.append(
_log(
f"{filename} has been found multiple times during this validation. "
f"Locations of duplicates: {str(filepaths)}."
)
)

def _find_counts(self, groups: dict[filename_pattern, list[Path]], lock):
with lock:
for pattern, paths in groups.items():
comparison = {}
for path in paths:
comparison[path] = self._file_record_counts.get(str(path))
if not (len(set(comparison.values())) == 1):
self.errors.append(
_log(
f"{filename} has been found multiple times during this validation. "
f"Locations of duplicates: {filepaths}." # noqa: E501
)
f"Counts do not match among files matching pattern {pattern.prefix}_{pattern.read_type}#_{pattern.set_num}: {comparison}"
)
else:
_log(
f"PASSED: Record count comparison for files matching pattern {pattern.prefix}_{pattern.read_type}#_{pattern.set_num}: {comparison}"
)

def _find_shared_prefixes(self, lock):
# This pattern seeks out the string that includes the lane number (since
# that is expected to be present to help anchor the prefix) that comes
# before any of _I1, _I2, _R1, or _R2.
fastq_file_prefix_regex = re.compile(r"(.+_L\d+.*)_[IR][12][._]")
for fastq_file, records_read in self._file_record_counts.items():
match = fastq_file_prefix_regex.match(Path(fastq_file).name)
with lock:
if match:
filename_prefix = match.group(1)
if filename_prefix in self._file_prefix_counts.keys():
extant_count = self._file_prefix_counts[filename_prefix]
if extant_count != records_read:
# Find a file we've validated already that matches this
# prefix.
extant_files = [
str(Path(filepath).name)
for filepath, record_count in self._file_record_counts.items()
if record_count == extant_count
and Path(filepath).name.startswith(filename_prefix)
]
# Based on how the dictionaries are created, there should
# always be at least one matching filename.
assert extant_files

self.errors.append(
_log(
f"{Path(fastq_file).name} ({records_read} lines) "
f"does not match length of {extant_files[0]} "
f"({extant_count} lines)."
)
)
else:
self._file_prefix_counts[filename_prefix] = records_read
def _printable_filenames(
self, files: Union[list, ListProxy, Path, str], newlines: bool = True
):
if type(files) is list or type(files) is ListProxy:
file_list = [str(file) for file in files]
if newlines:
return "\n".join(file_list)
return file_list
elif type(files) is Path:
return str(files)
elif type(files) is str:
return files


def main():
parser = argparse.ArgumentParser(description="Validate FASTQ files.")
parser.add_argument(
"filepaths", type=Path, nargs="+", help="Files to validate for FASTQ syntax"
)
parser.add_argument("coreuse", type=int, help="Number of cores to use")

args = parser.parse_args()
if isinstance(args.filepaths, List):
Expand All @@ -289,7 +311,10 @@ def main():
raise Exception(f"Validator init received base_paths arg as type {type(args.filepaths)}")

validator = FASTQValidatorLogic(True)
validator.validate_fastq_files_in_path(filepaths, Lock())
if not (threads := args.coreuse):
cpus = cpu_count()
threads = cpus // 4 if cpus else 1
validator.validate_fastq_files_in_path(filepaths, threads)


if __name__ == "__main__":
Expand Down
Loading

0 comments on commit 931714e

Please sign in to comment.