From b1707bf3f36b96c982e743600c45d3286f34b96c Mon Sep 17 00:00:00 2001 From: Marcel Zwiers Date: Sun, 8 Dec 2024 21:07:37 +0100 Subject: [PATCH] Re-Refactoring of IO helper functions (`bidscoin.plugins` -> `bidscoin.utilities`) --- bidscoin/bids.py | 4 +- bidscoin/bidscoiner.py | 2 +- bidscoin/bidsmapper.py | 2 +- bidscoin/plugins/__init__.py | 551 +----------------------- bidscoin/plugins/dcm2niix2bids.py | 19 +- bidscoin/plugins/nibabel2bids.py | 3 - bidscoin/plugins/spec2nii2bids.py | 5 +- bidscoin/utilities/__init__.py | 562 ++++++++++++++++++++++++- bidscoin/utilities/bidsparticipants.py | 2 +- bidscoin/utilities/dicomsort.py | 4 +- bidscoin/utilities/rawmapper.py | 2 +- tests/test_plugins.py | 89 ---- tests/test_utilities.py | 88 ++++ 13 files changed, 664 insertions(+), 669 deletions(-) diff --git a/bidscoin/bids.py b/bidscoin/bids.py index a45e4072..f60fa52a 100644 --- a/bidscoin/bids.py +++ b/bidscoin/bids.py @@ -19,17 +19,15 @@ from fnmatch import fnmatch from pathlib import Path from typing import Union, Any, Iterable, NewType -from pydicom import config from importlib.util import find_spec if find_spec('bidscoin') is None: import sys sys.path.append(str(Path(__file__).parents[1])) -from bidscoin import bcoin, schemafolder, templatefolder, lsdirs, is_hidden, __version__, DEBUG +from bidscoin import bcoin, schemafolder, templatefolder, is_hidden, __version__ from bidscoin.plugins import EventsParser from ruamel.yaml import YAML yaml = YAML() yaml.representer.ignore_aliases = lambda *data: True # Expand aliases (https://stackoverflow.com/questions/58091449/disabling-alias-for-yaml-file-in-python) -config.INVALID_KEY_BEHAVIOR = 'IGNORE' # Define custom data types (replace with proper classes or TypeAlias of Python >= 3.10) Plugin = NewType('Plugin', dict[str, Any]) diff --git a/bidscoin/bidscoiner.py b/bidscoin/bidscoiner.py index 1ee0f689..e9f6e0ad 100755 --- a/bidscoin/bidscoiner.py +++ b/bidscoin/bidscoiner.py @@ -21,7 +21,7 @@ if find_spec('bidscoin') is None: sys.path.append(str(Path(__file__).parents[1])) from bidscoin import bcoin, bids, lsdirs, bidsversion, trackusage, __version__, DEBUG -from bidscoin.plugins import unpack +from bidscoin.utilities import unpack def bidscoiner(sourcefolder: str, bidsfolder: str, participant: list=(), force: bool=False, bidsmap: str='bidsmap.yaml', cluster: str='') -> None: diff --git a/bidscoin/bidsmapper.py b/bidscoin/bidsmapper.py index b4f56574..cbd6aa7b 100755 --- a/bidscoin/bidsmapper.py +++ b/bidscoin/bidsmapper.py @@ -20,7 +20,7 @@ sys.path.append(str(Path(__file__).parents[1])) from bidscoin import bcoin, lsdirs, trackusage, check_version, __version__ from bidscoin.bids import BidsMap -from bidscoin.plugins import unpack +from bidscoin.utilities import unpack _, uptodate, versionmessage = check_version() diff --git a/bidscoin/plugins/__init__.py b/bidscoin/plugins/__init__.py index a06f5ae6..2fc0c9c6 100644 --- a/bidscoin/plugins/__init__.py +++ b/bidscoin/plugins/__init__.py @@ -3,18 +3,10 @@ import logging import copy import pandas as pd -import tempfile -import warnings -import re -import shutil from pathlib import Path from abc import ABC, abstractmethod from typing import Union -from pydicom import dcmread, fileset -from bidscoin import is_hidden, lsdirs, DEBUG -from bidscoin.utilities import dicomsort -from functools import lru_cache -from importlib.util import find_spec +from bidscoin import is_hidden from typing import TYPE_CHECKING if TYPE_CHECKING: from bidscoin.bids import BidsMap # = Circular import @@ -269,544 +261,3 @@ def write(self, targetfile: Path): """Write the eventstable to a BIDS events.tsv file""" self.eventstable.to_csv(targetfile, sep='\t', index=False) - - -def unpack(sesfolder: Path, wildcard: str='', workfolder: Path='', _subprefix: Union[str,None]='') -> tuple[set[Path], bool]: - """ - Unpacks and sorts DICOM files in sourcefolder to a temporary folder if sourcefolder contains a DICOMDIR file or .tar.gz, .gz or .zip files - - :param sesfolder: The full pathname of the folder with the source data - :param wildcard: A glob search pattern to select the tarball/zipped files (leave empty to skip unzipping) - :param workfolder: A root folder for temporary data - :param _subprefix: A pytest helper variable that is passed to dicomsort.sortsessions(args, subprefix=_subprefix) - :return: Either ({a set of unpacked session folders}, True), or ({sourcefolder}, False) - """ - - # Search for zipped/tarball files - tarzipfiles = [*sesfolder.glob(wildcard)] if wildcard else [] - - # See if we have a flat unsorted (DICOM) data organization, i.e. no directories, but DICOM-files - flatDICOM = not lsdirs(sesfolder) and get_dicomfile(sesfolder).is_file() - - # Check if we are going to do unpacking and/or sorting - if tarzipfiles or flatDICOM or (sesfolder/'DICOMDIR').is_file(): - - if tarzipfiles: - LOGGER.info(f"Found zipped/tarball data in: {sesfolder}") - else: - LOGGER.info(f"Detected a {'flat' if flatDICOM else 'DICOMDIR'} data-structure in: {sesfolder}") - - # Create a (temporary) sub/ses workfolder for unpacking the data - if not workfolder: - workfolder = Path(tempfile.mkdtemp(dir=tempfile.gettempdir())) - else: - workfolder = Path(workfolder)/next(tempfile._get_candidate_names()) - worksesfolder = workfolder/sesfolder.relative_to(sesfolder.anchor) - worksesfolder.mkdir(parents=True, exist_ok=True) - - # Copy everything over to the workfolder - LOGGER.info(f"Making temporary copy: {sesfolder} -> {worksesfolder}") - shutil.copytree(sesfolder, worksesfolder, dirs_exist_ok=True) - - # Unpack the zip/tarball files in the temporary folder - sessions: set[Path] = set() - for tarzipfile in [worksesfolder/tarzipfile.name for tarzipfile in tarzipfiles]: - LOGGER.info(f"Unpacking: {tarzipfile.name} -> {worksesfolder}") - try: - shutil.unpack_archive(tarzipfile, worksesfolder) - except Exception as unpackerror: - LOGGER.warning(f"Could not unpack: {tarzipfile}\n{unpackerror}") - continue - - # Sort the DICOM files in the worksesfolder root immediately (to avoid name collisions) - if not (worksesfolder/'DICOMDIR').is_file() and get_dicomfile(worksesfolder).name: - sessions.update(dicomsort.sortsessions(worksesfolder, _subprefix, recursive=False)) - - # Sort the DICOM files if not sorted yet (e.g. DICOMDIR) - sessions.update(dicomsort.sortsessions(worksesfolder, _subprefix, recursive=True)) - - return sessions, True - - else: - - return {sesfolder}, False - - -@lru_cache(maxsize=65536) -def is_dicomfile(file: Path) -> bool: - """ - Checks whether a file is a DICOM-file. As a quick test, it first uses the feature that Dicoms have the - string DICM hardcoded at offset 0x80. If that fails and (the file has a normal DICOM extension, e.g. '.dcm') - then it is tested whether pydicom can read the file - - :param file: The full pathname of the file - :return: Returns true if a file is a DICOM-file - """ - - if file.is_file(): - - # Perform a quick test first - with file.open('rb') as dicomfile: - dicomfile.seek(0x80, 1) - if dicomfile.read(4) == b'DICM': - return True - - # Perform a proof of the pudding test (but avoid memory problems when reading a very large (e.g. EEG) source file) - if file.suffix.lower() in ('.ima','.dcm','.dicm','.dicom',''): - if file.name == 'DICOMDIR': - return True - dicomdata = dcmread(file, force=True) # The DICM tag may be missing for anonymized DICOM files. NB: Force=False raises an error for non-DICOM files - return 'Modality' in dicomdata - - return False - - -@lru_cache(maxsize=65536) -def is_parfile(file: Path) -> bool: - """ - Rudimentary check (on file extensions and whether it exists) whether a file is a Philips PAR file - - :param file: The full pathname of the file - :return: Returns true if a file is a Philips PAR-file - """ - - # TODO: Implement a proper check, e.g. using nibabel - try: - if file.is_file() and file.suffix.lower() == '.par' and '# CLINICAL TRYOUT' in file.read_text(): - return True - elif file.is_file() and file.suffix.lower() == '.xml': - return True - except (OSError, IOError, UnicodeDecodeError) as ioerror: - pass - - return False - - -def get_dicomfile(folder: Path, index: int=0) -> Path: - """ - Gets a dicom-file from the folder (supports DICOMDIR) - - :param folder: The full pathname of the folder - :param index: The index number of the dicom file - :return: The filename of the first dicom-file in the folder. - """ - - if is_hidden(Path(folder.name)): - return Path() - - if (folder/'DICOMDIR').is_file(): - dicomdir = fileset.FileSet(folder/'DICOMDIR') - files = [Path(file.path) for file in dicomdir] - else: - files = sorted(folder.iterdir()) - - idx = 0 - for file in files: - if not is_hidden(file.relative_to(folder)) and is_dicomfile(file): - if idx == index: - return file - else: - idx += 1 - - return Path() - - -def get_parfiles(folder: Path) -> list[Path]: - """ - Gets the Philips PAR-file from the folder - - :param folder: The full pathname of the folder - :return: The filenames of the PAR-files in the folder. - """ - - if is_hidden(Path(folder.name)): - return [] - - parfiles: list[Path] = [] - for file in sorted(folder.iterdir()): - if not is_hidden(file.relative_to(folder)) and is_parfile(file): - parfiles.append(file) - - return parfiles - - -@lru_cache(maxsize=65536) -def parse_x_protocol(pattern: str, dicomfile: Path) -> str: - """ - Siemens writes a protocol structure as text into each DICOM file. - This structure is necessary to recreate a scanning protocol from a DICOM, - since the DICOM information alone wouldn't be sufficient. - - This function is derived from dac2bids.py from Daniel Gomez 29.08.2016 - https://github.com/dangom/dac2bids/blob/master/dac2bids.py - - :param pattern: A regular expression: '^' + pattern + '\t = \t(.*)\\n' - :param dicomfile: The full pathname of the dicom-file - :return: The string extracted values from the dicom-file according to the given pattern - """ - - regexp = '^' + pattern + '\t = \t(.*)\n' - regex = re.compile(regexp.encode('utf-8')) - - with dicomfile.open('rb') as openfile: - for line in openfile: - match = regex.match(line) - if match: - return match.group(1).decode('utf-8') - - LOGGER.warning(f"Pattern: '{regexp.encode('unicode_escape').decode()}' not found in: {dicomfile}") - return '' - - -# Profiling shows this is currently the most expensive function, therefore the (primitive but effective) _DICOMDICT_CACHE optimization -_DICOMDICT_CACHE = _DICOMFILE_CACHE = None -@lru_cache(maxsize=65536) -def get_dicomfield(tagname: str, dicomfile: Path) -> Union[str, int]: - """ - Robustly extracts a DICOM attribute/tag value from a dictionary or from vendor specific fields. - - A XA-20 enhanced DICOM hack is made, i.e. if `EchoNumbers` is empty then an attempt is made to - read it from the ICE dims (see https://github.com/rordenlab/dcm2niix/blob/master/Siemens/README.md) - - Another hack is to get 'PhaseEncodingDirection` (see https://neurostars.org/t/determining-bids-phaseencodingdirection-from-dicom/612/10) - - :param tagname: DICOM attribute name (e.g. 'SeriesNumber') or Pydicom-style tag number (e.g. '0x00200011', '(0x20,0x11)', '(0020,0011)') - :param dicomfile: The full pathname of the dicom-file - :return: Extracted tag-values as a flat string - """ - - global _DICOMDICT_CACHE, _DICOMFILE_CACHE - - # Skip the RunItem properties - if tagname in ('provenance', 'properties', 'attributes', 'bids', 'meta', 'events'): # = RunItem().properties but that creates a circular import - return '' - - if not dicomfile.is_file(): - LOGGER.warning(f"{dicomfile} not found") - value = '' - - elif not is_dicomfile(dicomfile): - LOGGER.warning(f"{dicomfile} is not a DICOM file, cannot read {tagname}") - value = '' - - else: - with warnings.catch_warnings(): - if not DEBUG: warnings.simplefilter('ignore', UserWarning) - from nibabel.nicom import csareader - try: - if dicomfile != _DICOMFILE_CACHE: - if dicomfile.name == 'DICOMDIR': - LOGGER.bcdebug(f"Getting DICOM fields from {dicomfile} seems dysfunctional (will raise dcmread error below if pydicom => v3.0)") - dicomdata = dcmread(dicomfile, force=True) # The DICM tag may be missing for anonymized DICOM files - _DICOMDICT_CACHE = dicomdata - _DICOMFILE_CACHE = dicomfile - else: - dicomdata = _DICOMDICT_CACHE - - if re.fullmatch(r'\(?0x[\dA-F]*,?(0x)?[\dA-F]*\)?', tagname): # Try Pydicom's hexadecimal tag number first (must be a 2-tuple or int) - value = eval(f"dicomdata[{tagname}].value") # NB: This may generate e.g. UserWarning: Invalid value 'filepath' used with the 'in' operator: must be an element tag as a 2-tuple or int, or an element keyword - else: - value = dicomdata.get(tagname,'') if tagname in dicomdata else '' # Then try and see if it is an attribute name. NB: Do not use dicomdata.get(tagname, '') to avoid using its class attributes (e.g. 'filename') - - # Try a recursive search - if not value and value != 0: - for elem in dicomdata.iterall(): - if tagname in (elem.name, elem.keyword, str(elem.tag), str(elem.tag).replace(', ',',')): - value = elem.value - break - - if dicomdata.get('Modality') == 'MR': - - # PhaseEncodingDirection patch (see https://neurostars.org/t/determining-bids-phaseencodingdirection-from-dicom/612/10) - if tagname == 'PhaseEncodingDirection' and not value: - if 'SIEMENS' in dicomdata.get('Manufacturer').upper() and csareader.get_csa_header(dicomdata): - csa = csareader.get_csa_header(dicomdata, 'Image')['tags'] - pos = csa.get('PhaseEncodingDirectionPositive',{}).get('items') # = [0] or [1] - dir = dicomdata.get('InPlanePhaseEncodingDirection') # = ROW or COL - if dir == 'COL' and pos: - value = 'AP' if pos[0] else 'PA' - elif dir == 'ROW' and pos: - value = 'LR' if pos[0] else 'RL' - elif dicomdata.get('Manufacturer','').upper().startswith('GE'): - value = dicomdata.get('RectilinearPhaseEncodeReordering') # = LINEAR or REVERSE_LINEAR - - # XA enhanced DICOM hack: Catch missing EchoNumbers from the private ICE_Dims field (0x21, 0x1106) - if tagname in ('EchoNumber', 'EchoNumbers') and not value: - for elem in dicomdata.iterall(): - if elem.tag == (0x21,0x1106): - value = elem.value.split('_')[1] if '_' in elem.value else '' - LOGGER.bcdebug(f"Parsed `EchoNumber(s)` from Siemens ICE_Dims `{elem.value}` as: {value}") - break - - # Try reading the Siemens CSA header. For VA/VB-versions the CSA header tag is (0029,1020), for XA-versions (0021,1019). TODO: see if dicom_parser is supporting this - if not value and value != 0 and 'SIEMENS' in dicomdata.get('Manufacturer').upper() and csareader.get_csa_header(dicomdata): - - if find_spec('dicom_parser'): - from dicom_parser import Image - LOGGER.bcdebug(f"Parsing {tagname} from the CSA header using `dicom_parser`") - for csa in ('CSASeriesHeaderInfo', 'CSAImageHeaderInfo'): - value = value if (value or value==0) else Image(dicomfile).header.get(csa) - for csatag in tagname.split('.'): # E.g. CSA tagname = 'SliceArray.Slice.instance_number.Position.Tra' - if isinstance(value, dict): # Final CSA header attributes in dictionary of dictionaries - value = value.get(csatag, '') - if 'value' in value: # Normal CSA (i.e. not MrPhoenixProtocol) - value = value['value'] - if value != 0: - value = str(value or '') - - else: - LOGGER.bcdebug(f"Parsing {tagname} from the CSA header using `nibabel`") - for modality in ('Series', 'Image'): - value = value if (value or value==0) else csareader.get_csa_header(dicomdata, modality)['tags'] - for csatag in tagname.split('.'): # NB: Currently MrPhoenixProtocol is not supported - if isinstance(value, dict): # Final CSA header attributes in dictionary of dictionaries - value = value.get(csatag, {}).get('items', '') - if isinstance(value, list) and len(value) == 1: - value = value[0] - if value != 0: - value = str(value or '') - - if not value and value != 0 and 'Modality' not in dicomdata: - raise ValueError(f"Missing mandatory DICOM 'Modality' field in: {dicomfile}") - - except (IOError, OSError) as ioerror: - LOGGER.warning(f"Cannot read {tagname} from {dicomfile}\n{ioerror}") - value = '' - - except Exception as dicomerror: - LOGGER.warning(f"Could not read {tagname} from {dicomfile}\n{dicomerror}") - value = '' - - # Cast the dicom data type to int or str (i.e. to something that yaml.dump can handle) - if isinstance(value, int): - return int(value) - elif value is None: - return '' - else: - return str(value) # If it's a MultiValue type then flatten it - - -# Profiling shows this is currently the most expensive function, therefore the (primitive but effective) cache optimization -_TWIXHDR_CACHE = _TWIXFILE_CACHE = None -@lru_cache(maxsize=65536) -def get_twixfield(tagname: str, twixfile: Path, mraid: int=2) -> Union[str, int]: - """ - Recursively searches the TWIX file to extract the field value - - :param tagname: Name of the TWIX field - :param twixfile: The full pathname of the TWIX file - :param mraid: The mapVBVD argument for selecting the multiraid file to load (default = 2, i.e. 2nd file) - :return: Extracted tag-values from the TWIX file - """ - - global _TWIXHDR_CACHE, _TWIXFILE_CACHE - - if not twixfile.is_file(): - LOGGER.error(f"{twixfile} not found") - value = '' - - else: - try: - if twixfile != _TWIXFILE_CACHE: - - from mapvbvd import mapVBVD - - twixObj = mapVBVD(twixfile, quiet=True) - if isinstance(twixObj, list): - twixObj = twixObj[mraid - 1] - hdr = twixObj['hdr'] - _TWIXHDR_CACHE = hdr - _TWIXFILE_CACHE = twixfile - else: - hdr = _TWIXHDR_CACHE - - def iterget(item, key): - if isinstance(item, dict): - - # First check to see if we can get the key-value data from the item - val = item.get(key, '') - if val and not isinstance(val, dict): - return val - - # Loop over the item to see if we can get the key-value from the sub-items - if isinstance(item, dict): - for ds in item: - val = iterget(item[ds], key) - if val: - return val - - return '' - - value = iterget(hdr, tagname) - - except (IOError, OSError): - LOGGER.warning(f'Cannot read {tagname} from {twixfile}') - value = '' - - except Exception as twixerror: - LOGGER.warning(f'Could not parse {tagname} from {twixfile}\n{twixerror}') - value = '' - - # Cast the dicom data type to int or str (i.e. to something that yaml.dump can handle) - if isinstance(value, int): - return int(value) - elif value is None: - return '' - else: - return str(value) # If it's a MultiValue type then flatten it - - -# Profiling shows this is currently the most expensive function, therefore the (primitive but effective) _PARDICT_CACHE optimization -_PARDICT_CACHE = _PARFILE_CACHE = None -@lru_cache(maxsize=65536) -def get_parfield(tagname: str, parfile: Path) -> Union[str, int]: - """ - Uses nibabel to extract the value from a PAR field (NB: nibabel does not yet support XML) - - :param tagname: Name of the PAR/XML field - :param parfile: The full pathname of the PAR/XML file - :return: Extracted tag-values from the PAR/XML file - """ - - global _PARDICT_CACHE, _PARFILE_CACHE - - if not parfile.is_file(): - LOGGER.error(f"{parfile} not found") - value = '' - - elif not is_parfile(parfile): - LOGGER.warning(f"{parfile} is not a PAR/XML file, cannot read {tagname}") - value = '' - - else: - try: - from nibabel.parrec import parse_PAR_header - - if parfile != _PARFILE_CACHE: - pardict = parse_PAR_header(parfile.open('r')) - if 'series_type' not in pardict[0]: - raise ValueError(f'Cannot read {parfile}') - _PARDICT_CACHE = pardict - _PARFILE_CACHE = parfile - else: - pardict = _PARDICT_CACHE - value = pardict[0].get(tagname, '') - - except (IOError, OSError): - LOGGER.warning(f'Cannot read {tagname} from {parfile}') - value = '' - - except Exception as parerror: - LOGGER.warning(f'Could not parse {tagname} from {parfile}\n{parerror}') - value = '' - - # Cast the dicom data type to int or str (i.e. to something that yaml.dump can handle) - if isinstance(value, int): - return int(value) - elif value is None: - return '' - else: - return str(value) # If it's a MultiValue type then flatten it - - -# Profiling shows this is currently the most expensive function, therefore the (primitive but effective) cache optimization -_SPARHDR_CACHE = _SPARFILE_CACHE = None -@lru_cache(maxsize=65536) -def get_sparfield(tagname: str, sparfile: Path) -> Union[str, int]: - """ - Extracts the field value from the SPAR header-file - - :param tagname: Name of the SPAR field - :param sparfile: The full pathname of the SPAR file - :return: Extracted tag-values from the SPAR file - """ - - global _SPARHDR_CACHE, _SPARFILE_CACHE - - value = '' - if not sparfile.is_file(): - LOGGER.error(f"{sparfile} not found") - - else: - try: - if sparfile != _SPARFILE_CACHE: - - from spec2nii.Philips.philips import read_spar - - hdr = read_spar(sparfile) - _SPARHDR_CACHE = hdr - _SPARFILE_CACHE = sparfile - else: - hdr = _SPARHDR_CACHE - - value = hdr.get(tagname, '') - - except ImportError: - LOGGER.warning(f"The extra `spec2nii` library could not be found or was not installed (see the BIDScoin install instructions)") - - except (IOError, OSError): - LOGGER.warning(f"Cannot read {tagname} from {sparfile}") - - except Exception as sparerror: - LOGGER.warning(f"Could not parse {tagname} from {sparfile}\n{sparerror}") - - # Cast the dicom data type to int or str (i.e. to something that yaml.dump can handle) - if isinstance(value, int): - return int(value) - elif value is None: - return '' - else: - return str(value) # If it's a MultiValue type then flatten it - - -# Profiling shows this is currently the most expensive function, therefore the (primitive but effective) cache optimization -_P7HDR_CACHE = _P7FILE_CACHE = None -@lru_cache(maxsize=65536) -def get_p7field(tagname: str, p7file: Path) -> Union[str, int]: - """ - Extracts the field value from the P-file header - - :param tagname: Name of the SPAR field - :param p7file: The full pathname of the P7 file - :return: Extracted tag-values from the P7 file - """ - - global _P7HDR_CACHE, _P7FILE_CACHE - - value = '' - if not p7file.is_file(): - LOGGER.error(f"{p7file} not found") - - else: - try: - if p7file != _P7FILE_CACHE: - - from spec2nii.GE.ge_read_pfile import Pfile - - hdr = Pfile(p7file).hdr - _P7HDR_CACHE = hdr - _P7FILE_CACHE = p7file - else: - hdr = _P7HDR_CACHE - - value = getattr(hdr, tagname, '') - if type(value) is bytes: - try: value = value.decode('UTF-8') - except UnicodeDecodeError: pass - - except ImportError: - LOGGER.warning(f"The extra `spec2nii` library could not be found or was not installed (see the BIDScoin install instructions)") - - except (IOError, OSError): - LOGGER.warning(f'Cannot read {tagname} from {p7file}') - - except Exception as p7error: - LOGGER.warning(f'Could not parse {tagname} from {p7file}\n{p7error}') - - # Cast the dicom data type to int or str (i.e. to something that yaml.dump can handle) - if isinstance(value, int): - return int(value) - elif value is None: - return '' - else: - return str(value) # If it's a MultiValue type then flatten it diff --git a/bidscoin/plugins/dcm2niix2bids.py b/bidscoin/plugins/dcm2niix2bids.py index e7e56b62..92d167a5 100644 --- a/bidscoin/plugins/dcm2niix2bids.py +++ b/bidscoin/plugins/dcm2niix2bids.py @@ -14,9 +14,9 @@ from typing import Union from pathlib import Path from bidscoin import bids, run_command, lsdirs, due, Doi -from bidscoin.utilities import physio +from bidscoin.utilities import physio, is_dicomfile, is_parfile, get_dicomfield, get_parfield, get_dicomfile, get_parfiles from bidscoin.bids import BidsMap, DataFormat, Plugin, Plugins -from bidscoin.plugins import PluginInterface, is_dicomfile, is_parfile, get_dicomfield, get_parfield, get_dicomfile, get_parfiles +from bidscoin.plugins import PluginInterface try: from nibabel.testing import data_path except ImportError: @@ -92,7 +92,6 @@ def has_support(self, file: Path, dataformat: Union[DataFormat, str]='') -> str: return '' - def get_attribute(self, dataformat: Union[DataFormat, str], sourcefile: Path, attribute: str, options) -> Union[str, int]: """ This plugin supports reading attributes from DICOM and PAR dataformats @@ -109,7 +108,6 @@ def get_attribute(self, dataformat: Union[DataFormat, str], sourcefile: Path, at if dataformat == 'PAR': return get_parfield(attribute, sourcefile) - def bidsmapper(self, session: Path, bidsmap_new: BidsMap, bidsmap_old: BidsMap, template: BidsMap) -> None: """ The goal of this plugin function is to identify all the different runs in the session and update the @@ -190,7 +188,6 @@ def bidsmapper(self, session: Path, bidsmap_new: BidsMap, bidsmap_old: BidsMap, else: LOGGER.bcdebug(f"Existing/duplicate sample: {run.datasource}") - @due.dcite(Doi('10.1016/j.jneumeth.2016.03.001'), description='dcm2niix: DICOM to NIfTI converter', tags=['reference-implementation']) def bidscoiner(self, session: Path, bidsmap: BidsMap, bidsses: Path) -> Union[None, dict]: """ @@ -206,9 +203,8 @@ def bidscoiner(self, session: Path, bidsmap: BidsMap, bidsses: Path) -> Union[No """ # Get the subject identifiers and the BIDS root folder from the bidsses folder - subid = bidsses.name if bidsses.name.startswith('sub-') else bidsses.parent.name - sesid = bidsses.name if bidsses.name.startswith('ses-') else '' - bidsfolder = bidsses.parent.parent if sesid else bidsses.parent + subid = bidsses.name if bidsses.name.startswith('sub-') else bidsses.parent.name + sesid = bidsses.name if bidsses.name.startswith('ses-') else '' # Get started and see what dataformat we have options = Plugin(bidsmap.plugins['dcm2niix2bids']) @@ -222,13 +218,10 @@ def bidscoiner(self, session: Path, bidsmap: BidsMap, bidsses: Path) -> Union[No # Make a list of all the data sources/runs sources: list[Path] = [] - manufacturer = 'UNKNOWN' if dataformat == 'DICOM': - sources = lsdirs(session, '**/*') - manufacturer = datasource.attributes('Manufacturer') + sources = lsdirs(session, '**/*') elif dataformat == 'PAR': - sources = get_parfiles(session) - manufacturer = 'Philips Medical Systems' + sources = get_parfiles(session) else: LOGGER.error(f"Unsupported dataformat '{dataformat}'") diff --git a/bidscoin/plugins/nibabel2bids.py b/bidscoin/plugins/nibabel2bids.py index fd5ccc98..7f1c8b0f 100644 --- a/bidscoin/plugins/nibabel2bids.py +++ b/bidscoin/plugins/nibabel2bids.py @@ -62,7 +62,6 @@ def test(self, options: Plugin=OPTIONS) -> int: return 0 - def has_support(self, file: Path, dataformat: Union[DataFormat, str]='') -> str: """ This plugin function assesses whether a sourcefile is of a supported dataformat @@ -81,7 +80,6 @@ def has_support(self, file: Path, dataformat: Union[DataFormat, str]='') -> str: return '' - def get_attribute(self, dataformat: Union[DataFormat, str], sourcefile: Path, attribute: str, options: Plugin) -> Union[str, int, float, list]: """ This plugin supports reading attributes from DICOM and PAR dataformats @@ -107,7 +105,6 @@ def get_attribute(self, dataformat: Union[DataFormat, str], sourcefile: Path, at return value - def bidscoiner(self, session: Path, bidsmap: BidsMap, bidsses: Path) -> None: """ The bidscoiner plugin to convert the session Nibabel source-files into BIDS-valid NIfTI-files in the diff --git a/bidscoin/plugins/spec2nii2bids.py b/bidscoin/plugins/spec2nii2bids.py index 366b9035..7c34098a 100644 --- a/bidscoin/plugins/spec2nii2bids.py +++ b/bidscoin/plugins/spec2nii2bids.py @@ -12,7 +12,8 @@ from pathlib import Path from bidscoin import run_command, is_hidden, bids, due, Doi from bidscoin.bids import BidsMap, DataFormat, Plugin -from bidscoin.plugins import PluginInterface, is_dicomfile, get_twixfield, get_sparfield, get_p7field +from bidscoin.plugins import PluginInterface +from bidscoin.utilities import is_dicomfile, get_twixfield, get_sparfield, get_p7field LOGGER = logging.getLogger(__name__) @@ -45,7 +46,6 @@ def test(self, options: Plugin=OPTIONS) -> int: # Test the spec2nii installation return run_command(f"{options.get('command',OPTIONS['command'])} -v") - def has_support(self, file: Path, dataformat: Union[DataFormat, str]='') -> str: """ This plugin function assesses whether a sourcefile is of a supported dataformat @@ -68,7 +68,6 @@ def has_support(self, file: Path, dataformat: Union[DataFormat, str]='') -> str: return '' - def get_attribute(self, dataformat: Union[DataFormat, str], sourcefile: Path, attribute: str, options: Plugin) -> str: """ This plugin function reads attributes from the supported sourcefile diff --git a/bidscoin/utilities/__init__.py b/bidscoin/utilities/__init__.py index ef0d29cd..c4316306 100644 --- a/bidscoin/utilities/__init__.py +++ b/bidscoin/utilities/__init__.py @@ -1 +1,561 @@ -"""A package of useful helper tools""" \ No newline at end of file +"""A package of useful helper tools""" + +import tempfile +import warnings +import re +import shutil +import logging +from typing import Union +from pathlib import Path +from pydicom import dcmread, fileset, config +from functools import lru_cache +from importlib.util import find_spec +from bidscoin import is_hidden, lsdirs, DEBUG + +config.INVALID_KEY_BEHAVIOR = 'IGNORE' + +LOGGER = logging.getLogger(__name__) + + +def unpack(sesfolder: Path, wildcard: str='', workfolder: Path='', _subprefix: Union[str,None]='') -> tuple[set[Path], bool]: + """ + Unpacks and sorts DICOM files in sourcefolder to a temporary folder if sourcefolder contains a DICOMDIR file or .tar.gz, .gz or .zip files + + :param sesfolder: The full pathname of the folder with the source data + :param wildcard: A glob search pattern to select the tarball/zipped files (leave empty to skip unzipping) + :param workfolder: A root folder for temporary data + :param _subprefix: A pytest helper variable that is passed to dicomsort.sortsessions(args, subprefix=_subprefix) + :return: Either ({a set of unpacked session folders}, True), or ({sourcefolder}, False) + """ + + # Avoid circular import + from bidscoin.utilities import dicomsort + + # Search for zipped/tarball files + tarzipfiles = [*sesfolder.glob(wildcard)] if wildcard else [] + + # See if we have a flat unsorted (DICOM) data organization, i.e. no directories, but DICOM-files + flatDICOM = not lsdirs(sesfolder) and get_dicomfile(sesfolder).is_file() + + # Check if we are going to do unpacking and/or sorting + if tarzipfiles or flatDICOM or (sesfolder/'DICOMDIR').is_file(): + + if tarzipfiles: + LOGGER.info(f"Found zipped/tarball data in: {sesfolder}") + else: + LOGGER.info(f"Detected a {'flat' if flatDICOM else 'DICOMDIR'} data-structure in: {sesfolder}") + + # Create a (temporary) sub/ses workfolder for unpacking the data + if not workfolder: + workfolder = Path(tempfile.mkdtemp(dir=tempfile.gettempdir())) + else: + workfolder = Path(workfolder)/next(tempfile._get_candidate_names()) + worksesfolder = workfolder/sesfolder.relative_to(sesfolder.anchor) + worksesfolder.mkdir(parents=True, exist_ok=True) + + # Copy everything over to the workfolder + LOGGER.info(f"Making temporary copy: {sesfolder} -> {worksesfolder}") + shutil.copytree(sesfolder, worksesfolder, dirs_exist_ok=True) + + # Unpack the zip/tarball files in the temporary folder + sessions: set[Path] = set() + for tarzipfile in [worksesfolder/tarzipfile.name for tarzipfile in tarzipfiles]: + LOGGER.info(f"Unpacking: {tarzipfile.name} -> {worksesfolder}") + try: + shutil.unpack_archive(tarzipfile, worksesfolder) + except Exception as unpackerror: + LOGGER.warning(f"Could not unpack: {tarzipfile}\n{unpackerror}") + continue + + # Sort the DICOM files in the worksesfolder root immediately (to avoid name collisions) + if not (worksesfolder/'DICOMDIR').is_file() and get_dicomfile(worksesfolder).name: + sessions.update(dicomsort.sortsessions(worksesfolder, _subprefix, recursive=False)) + + # Sort the DICOM files if not sorted yet (e.g. DICOMDIR) + sessions.update(dicomsort.sortsessions(worksesfolder, _subprefix, recursive=True)) + + return sessions, True + + else: + + return {sesfolder}, False + + +@lru_cache(maxsize=65536) +def is_dicomfile(file: Path) -> bool: + """ + Checks whether a file is a DICOM-file. As a quick test, it first uses the feature that Dicoms have the + string DICM hardcoded at offset 0x80. If that fails and (the file has a normal DICOM extension, e.g. '.dcm') + then it is tested whether pydicom can read the file + + :param file: The full pathname of the file + :return: Returns true if a file is a DICOM-file + """ + + if file.is_file(): + + # Perform a quick test first + with file.open('rb') as dicomfile: + dicomfile.seek(0x80, 1) + if dicomfile.read(4) == b'DICM': + return True + + # Perform a proof of the pudding test (but avoid memory problems when reading a very large (e.g. EEG) source file) + if file.suffix.lower() in ('.ima','.dcm','.dicm','.dicom',''): + if file.name == 'DICOMDIR': + return True + dicomdata = dcmread(file, force=True) # The DICM tag may be missing for anonymized DICOM files. NB: Force=False raises an error for non-DICOM files + return 'Modality' in dicomdata + + return False + + +@lru_cache(maxsize=65536) +def is_parfile(file: Path) -> bool: + """ + Rudimentary check (on file extensions and whether it exists) whether a file is a Philips PAR file + + :param file: The full pathname of the file + :return: Returns true if a file is a Philips PAR-file + """ + + # TODO: Implement a proper check, e.g. using nibabel + try: + if file.is_file() and file.suffix.lower() == '.par' and '# CLINICAL TRYOUT' in file.read_text(): + return True + elif file.is_file() and file.suffix.lower() == '.xml': + return True + except (OSError, IOError, UnicodeDecodeError) as ioerror: + pass + + return False + + +def get_dicomfile(folder: Path, index: int=0) -> Path: + """ + Gets a dicom-file from the folder (supports DICOMDIR) + + :param folder: The full pathname of the folder + :param index: The index number of the dicom file + :return: The filename of the first dicom-file in the folder. + """ + + if is_hidden(Path(folder.name)): + return Path() + + if (folder/'DICOMDIR').is_file(): + dicomdir = fileset.FileSet(folder/'DICOMDIR') + files = [Path(file.path) for file in dicomdir] + else: + files = sorted(folder.iterdir()) + + idx = 0 + for file in files: + if not is_hidden(file.relative_to(folder)) and is_dicomfile(file): + if idx == index: + return file + else: + idx += 1 + + return Path() + + +def get_parfiles(folder: Path) -> list[Path]: + """ + Gets the Philips PAR-file from the folder + + :param folder: The full pathname of the folder + :return: The filenames of the PAR-files in the folder. + """ + + if is_hidden(Path(folder.name)): + return [] + + parfiles: list[Path] = [] + for file in sorted(folder.iterdir()): + if not is_hidden(file.relative_to(folder)) and is_parfile(file): + parfiles.append(file) + + return parfiles + + +@lru_cache(maxsize=65536) +def parse_x_protocol(pattern: str, dicomfile: Path) -> str: + """ + Siemens writes a protocol structure as text into each DICOM file. + This structure is necessary to recreate a scanning protocol from a DICOM, + since the DICOM information alone wouldn't be sufficient. + + This function is derived from dac2bids.py from Daniel Gomez 29.08.2016 + https://github.com/dangom/dac2bids/blob/master/dac2bids.py + + :param pattern: A regular expression: '^' + pattern + '\t = \t(.*)\\n' + :param dicomfile: The full pathname of the dicom-file + :return: The string extracted values from the dicom-file according to the given pattern + """ + + regexp = '^' + pattern + '\t = \t(.*)\n' + regex = re.compile(regexp.encode('utf-8')) + + with dicomfile.open('rb') as openfile: + for line in openfile: + match = regex.match(line) + if match: + return match.group(1).decode('utf-8') + + LOGGER.warning(f"Pattern: '{regexp.encode('unicode_escape').decode()}' not found in: {dicomfile}") + return '' + + +# Profiling shows this is currently the most expensive function, therefore the (primitive but effective) _DICOMDICT_CACHE optimization +_DICOMDICT_CACHE = _DICOMFILE_CACHE = None +@lru_cache(maxsize=65536) +def get_dicomfield(tagname: str, dicomfile: Path) -> Union[str, int]: + """ + Robustly extracts a DICOM attribute/tag value from a dictionary or from vendor specific fields. + + A XA-20 enhanced DICOM hack is made, i.e. if `EchoNumbers` is empty then an attempt is made to + read it from the ICE dims (see https://github.com/rordenlab/dcm2niix/blob/master/Siemens/README.md) + + Another hack is to get 'PhaseEncodingDirection` (see https://neurostars.org/t/determining-bids-phaseencodingdirection-from-dicom/612/10) + + :param tagname: DICOM attribute name (e.g. 'SeriesNumber') or Pydicom-style tag number (e.g. '0x00200011', '(0x20,0x11)', '(0020,0011)') + :param dicomfile: The full pathname of the dicom-file + :return: Extracted tag-values as a flat string + """ + + global _DICOMDICT_CACHE, _DICOMFILE_CACHE + + # Skip the RunItem properties + if tagname in ('provenance', 'properties', 'attributes', 'bids', 'meta', 'events'): # = RunItem().properties but that creates a circular import + return '' + + if not dicomfile.is_file(): + LOGGER.warning(f"{dicomfile} not found") + value = '' + + elif not is_dicomfile(dicomfile): + LOGGER.warning(f"{dicomfile} is not a DICOM file, cannot read {tagname}") + value = '' + + else: + with warnings.catch_warnings(): + if not DEBUG: warnings.simplefilter('ignore', UserWarning) + from nibabel.nicom import csareader + try: + if dicomfile != _DICOMFILE_CACHE: + if dicomfile.name == 'DICOMDIR': + LOGGER.bcdebug(f"Getting DICOM fields from {dicomfile} seems dysfunctional (will raise dcmread error below if pydicom => v3.0)") + dicomdata = dcmread(dicomfile, force=True) # The DICM tag may be missing for anonymized DICOM files + _DICOMDICT_CACHE = dicomdata + _DICOMFILE_CACHE = dicomfile + else: + dicomdata = _DICOMDICT_CACHE + + if re.fullmatch(r'\(?0x[\dA-F]*,?(0x)?[\dA-F]*\)?', tagname): # Try Pydicom's hexadecimal tag number first (must be a 2-tuple or int) + value = eval(f"dicomdata[{tagname}].value") # NB: This may generate e.g. UserWarning: Invalid value 'filepath' used with the 'in' operator: must be an element tag as a 2-tuple or int, or an element keyword + else: + value = dicomdata.get(tagname,'') if tagname in dicomdata else '' # Then try and see if it is an attribute name. NB: Do not use dicomdata.get(tagname, '') to avoid using its class attributes (e.g. 'filename') + + # Try a recursive search + if not value and value != 0: + for elem in dicomdata.iterall(): + if tagname in (elem.name, elem.keyword, str(elem.tag), str(elem.tag).replace(', ',',')): + value = elem.value + break + + if dicomdata.get('Modality') == 'MR': + + # PhaseEncodingDirection patch (see https://neurostars.org/t/determining-bids-phaseencodingdirection-from-dicom/612/10) + if tagname == 'PhaseEncodingDirection' and not value: + if 'SIEMENS' in dicomdata.get('Manufacturer').upper() and csareader.get_csa_header(dicomdata): + csa = csareader.get_csa_header(dicomdata, 'Image')['tags'] + pos = csa.get('PhaseEncodingDirectionPositive',{}).get('items') # = [0] or [1] + dir = dicomdata.get('InPlanePhaseEncodingDirection') # = ROW or COL + if dir == 'COL' and pos: + value = 'AP' if pos[0] else 'PA' + elif dir == 'ROW' and pos: + value = 'LR' if pos[0] else 'RL' + elif dicomdata.get('Manufacturer','').upper().startswith('GE'): + value = dicomdata.get('RectilinearPhaseEncodeReordering') # = LINEAR or REVERSE_LINEAR + + # XA enhanced DICOM hack: Catch missing EchoNumbers from the private ICE_Dims field (0x21, 0x1106) + if tagname in ('EchoNumber', 'EchoNumbers') and not value: + for elem in dicomdata.iterall(): + if elem.tag == (0x21,0x1106): + value = elem.value.split('_')[1] if '_' in elem.value else '' + LOGGER.bcdebug(f"Parsed `EchoNumber(s)` from Siemens ICE_Dims `{elem.value}` as: {value}") + break + + # Try reading the Siemens CSA header. For VA/VB-versions the CSA header tag is (0029,1020), for XA-versions (0021,1019). TODO: see if dicom_parser is supporting this + if not value and value != 0 and 'SIEMENS' in dicomdata.get('Manufacturer').upper() and csareader.get_csa_header(dicomdata): + + if find_spec('dicom_parser'): + from dicom_parser import Image + LOGGER.bcdebug(f"Parsing {tagname} from the CSA header using `dicom_parser`") + for csa in ('CSASeriesHeaderInfo', 'CSAImageHeaderInfo'): + value = value if (value or value==0) else Image(dicomfile).header.get(csa) + for csatag in tagname.split('.'): # E.g. CSA tagname = 'SliceArray.Slice.instance_number.Position.Tra' + if isinstance(value, dict): # Final CSA header attributes in dictionary of dictionaries + value = value.get(csatag, '') + if 'value' in value: # Normal CSA (i.e. not MrPhoenixProtocol) + value = value['value'] + if value != 0: + value = str(value or '') + + else: + LOGGER.bcdebug(f"Parsing {tagname} from the CSA header using `nibabel`") + for modality in ('Series', 'Image'): + value = value if (value or value==0) else csareader.get_csa_header(dicomdata, modality)['tags'] + for csatag in tagname.split('.'): # NB: Currently MrPhoenixProtocol is not supported + if isinstance(value, dict): # Final CSA header attributes in dictionary of dictionaries + value = value.get(csatag, {}).get('items', '') + if isinstance(value, list) and len(value) == 1: + value = value[0] + if value != 0: + value = str(value or '') + + if not value and value != 0 and 'Modality' not in dicomdata: + raise ValueError(f"Missing mandatory DICOM 'Modality' field in: {dicomfile}") + + except (IOError, OSError) as ioerror: + LOGGER.warning(f"Cannot read {tagname} from {dicomfile}\n{ioerror}") + value = '' + + except Exception as dicomerror: + LOGGER.warning(f"Could not read {tagname} from {dicomfile}\n{dicomerror}") + value = '' + + # Cast the dicom data type to int or str (i.e. to something that yaml.dump can handle) + if isinstance(value, int): + return int(value) + elif value is None: + return '' + else: + return str(value) # If it's a MultiValue type then flatten it + + +# Profiling shows this is currently the most expensive function, therefore the (primitive but effective) cache optimization +_TWIXHDR_CACHE = _TWIXFILE_CACHE = None +@lru_cache(maxsize=65536) +def get_twixfield(tagname: str, twixfile: Path, mraid: int=2) -> Union[str, int]: + """ + Recursively searches the TWIX file to extract the field value + + :param tagname: Name of the TWIX field + :param twixfile: The full pathname of the TWIX file + :param mraid: The mapVBVD argument for selecting the multiraid file to load (default = 2, i.e. 2nd file) + :return: Extracted tag-values from the TWIX file + """ + + global _TWIXHDR_CACHE, _TWIXFILE_CACHE + + if not twixfile.is_file(): + LOGGER.error(f"{twixfile} not found") + value = '' + + else: + try: + if twixfile != _TWIXFILE_CACHE: + + from mapvbvd import mapVBVD + + twixObj = mapVBVD(twixfile, quiet=True) + if isinstance(twixObj, list): + twixObj = twixObj[mraid - 1] + hdr = twixObj['hdr'] + _TWIXHDR_CACHE = hdr + _TWIXFILE_CACHE = twixfile + else: + hdr = _TWIXHDR_CACHE + + def iterget(item, key): + if isinstance(item, dict): + + # First check to see if we can get the key-value data from the item + val = item.get(key, '') + if val and not isinstance(val, dict): + return val + + # Loop over the item to see if we can get the key-value from the sub-items + if isinstance(item, dict): + for ds in item: + val = iterget(item[ds], key) + if val: + return val + + return '' + + value = iterget(hdr, tagname) + + except (IOError, OSError): + LOGGER.warning(f'Cannot read {tagname} from {twixfile}') + value = '' + + except Exception as twixerror: + LOGGER.warning(f'Could not parse {tagname} from {twixfile}\n{twixerror}') + value = '' + + # Cast the dicom data type to int or str (i.e. to something that yaml.dump can handle) + if isinstance(value, int): + return int(value) + elif value is None: + return '' + else: + return str(value) # If it's a MultiValue type then flatten it + + +# Profiling shows this is currently the most expensive function, therefore the (primitive but effective) _PARDICT_CACHE optimization +_PARDICT_CACHE = _PARFILE_CACHE = None +@lru_cache(maxsize=65536) +def get_parfield(tagname: str, parfile: Path) -> Union[str, int]: + """ + Uses nibabel to extract the value from a PAR field (NB: nibabel does not yet support XML) + + :param tagname: Name of the PAR/XML field + :param parfile: The full pathname of the PAR/XML file + :return: Extracted tag-values from the PAR/XML file + """ + + global _PARDICT_CACHE, _PARFILE_CACHE + + if not parfile.is_file(): + LOGGER.error(f"{parfile} not found") + value = '' + + elif not is_parfile(parfile): + LOGGER.warning(f"{parfile} is not a PAR/XML file, cannot read {tagname}") + value = '' + + else: + try: + from nibabel.parrec import parse_PAR_header + + if parfile != _PARFILE_CACHE: + pardict = parse_PAR_header(parfile.open('r')) + if 'series_type' not in pardict[0]: + raise ValueError(f'Cannot read {parfile}') + _PARDICT_CACHE = pardict + _PARFILE_CACHE = parfile + else: + pardict = _PARDICT_CACHE + value = pardict[0].get(tagname, '') + + except (IOError, OSError): + LOGGER.warning(f'Cannot read {tagname} from {parfile}') + value = '' + + except Exception as parerror: + LOGGER.warning(f'Could not parse {tagname} from {parfile}\n{parerror}') + value = '' + + # Cast the dicom data type to int or str (i.e. to something that yaml.dump can handle) + if isinstance(value, int): + return int(value) + elif value is None: + return '' + else: + return str(value) # If it's a MultiValue type then flatten it + + +# Profiling shows this is currently the most expensive function, therefore the (primitive but effective) cache optimization +_SPARHDR_CACHE = _SPARFILE_CACHE = None +@lru_cache(maxsize=65536) +def get_sparfield(tagname: str, sparfile: Path) -> Union[str, int]: + """ + Extracts the field value from the SPAR header-file + + :param tagname: Name of the SPAR field + :param sparfile: The full pathname of the SPAR file + :return: Extracted tag-values from the SPAR file + """ + + global _SPARHDR_CACHE, _SPARFILE_CACHE + + value = '' + if not sparfile.is_file(): + LOGGER.error(f"{sparfile} not found") + + else: + try: + if sparfile != _SPARFILE_CACHE: + + from spec2nii.Philips.philips import read_spar + + hdr = read_spar(sparfile) + _SPARHDR_CACHE = hdr + _SPARFILE_CACHE = sparfile + else: + hdr = _SPARHDR_CACHE + + value = hdr.get(tagname, '') + + except ImportError: + LOGGER.warning(f"The extra `spec2nii` library could not be found or was not installed (see the BIDScoin install instructions)") + + except (IOError, OSError): + LOGGER.warning(f"Cannot read {tagname} from {sparfile}") + + except Exception as sparerror: + LOGGER.warning(f"Could not parse {tagname} from {sparfile}\n{sparerror}") + + # Cast the dicom data type to int or str (i.e. to something that yaml.dump can handle) + if isinstance(value, int): + return int(value) + elif value is None: + return '' + else: + return str(value) # If it's a MultiValue type then flatten it + + +# Profiling shows this is currently the most expensive function, therefore the (primitive but effective) cache optimization +_P7HDR_CACHE = _P7FILE_CACHE = None +@lru_cache(maxsize=65536) +def get_p7field(tagname: str, p7file: Path) -> Union[str, int]: + """ + Extracts the field value from the P-file header + + :param tagname: Name of the SPAR field + :param p7file: The full pathname of the P7 file + :return: Extracted tag-values from the P7 file + """ + + global _P7HDR_CACHE, _P7FILE_CACHE + + value = '' + if not p7file.is_file(): + LOGGER.error(f"{p7file} not found") + + else: + try: + if p7file != _P7FILE_CACHE: + + from spec2nii.GE.ge_read_pfile import Pfile + + hdr = Pfile(p7file).hdr + _P7HDR_CACHE = hdr + _P7FILE_CACHE = p7file + else: + hdr = _P7HDR_CACHE + + value = getattr(hdr, tagname, '') + if type(value) is bytes: + try: value = value.decode('UTF-8') + except UnicodeDecodeError: pass + + except ImportError: + LOGGER.warning(f"The extra `spec2nii` library could not be found or was not installed (see the BIDScoin install instructions)") + + except (IOError, OSError): + LOGGER.warning(f'Cannot read {tagname} from {p7file}') + + except Exception as p7error: + LOGGER.warning(f'Could not parse {tagname} from {p7file}\n{p7error}') + + # Cast the dicom data type to int or str (i.e. to something that yaml.dump can handle) + if isinstance(value, int): + return int(value) + elif value is None: + return '' + else: + return str(value) # If it's a MultiValue type then flatten it diff --git a/bidscoin/utilities/bidsparticipants.py b/bidscoin/utilities/bidsparticipants.py index f3470432..5dd99ebd 100755 --- a/bidscoin/utilities/bidsparticipants.py +++ b/bidscoin/utilities/bidsparticipants.py @@ -12,7 +12,7 @@ sys.path.append(str(Path(__file__).parents[2])) from bidscoin import bcoin, bids, lsdirs, trackusage, __version__ from bidscoin.bids import BidsMap -from bidscoin.plugins import unpack +from bidscoin.utilities import unpack def scanpersonals(bidsmap: BidsMap, session: Path, personals: dict, keys: list) -> bool: diff --git a/bidscoin/utilities/dicomsort.py b/bidscoin/utilities/dicomsort.py index 29556e9f..8b2a71af 100755 --- a/bidscoin/utilities/dicomsort.py +++ b/bidscoin/utilities/dicomsort.py @@ -12,6 +12,7 @@ import sys sys.path.append(str(Path(__file__).parents[2])) from bidscoin import bcoin, lsdirs, trackusage +from bidscoin.utilities import get_dicomfield LOGGER = logging.getLogger(__name__) @@ -27,9 +28,6 @@ def construct_name(scheme: str, dicomfile: Path, force: bool) -> str: :return: The new name constructed from the scheme """ - # Avoid circular import from bidscoin.plugins - from bidscoin.plugins import get_dicomfield - # Alternative field names based on earlier DICOM versions or on other reasons alternatives = {'PatientName':'PatientsName', 'SeriesDescription':'ProtocolName', 'InstanceNumber':'ImageNumber', 'SeriesNumber':'SeriesInstanceUID', 'PatientsName':'PatientName', 'ProtocolName':'SeriesDescription', 'ImageNumber':'InstanceNumber'} diff --git a/bidscoin/utilities/rawmapper.py b/bidscoin/utilities/rawmapper.py index 94a6b0e2..1632ca0c 100755 --- a/bidscoin/utilities/rawmapper.py +++ b/bidscoin/utilities/rawmapper.py @@ -9,7 +9,7 @@ import sys sys.path.append(str(Path(__file__).parents[2])) from bidscoin import lsdirs, bids, trackusage -from bidscoin.plugins import get_dicomfield, get_dicomfile +from bidscoin.utilities import get_dicomfield, get_dicomfile def rawmapper(sourcefolder, outfolder: str='', sessions: tuple=(), rename: bool=False, clobber: bool=False, field: tuple=('PatientComments',), wildcard: str= '*', subprefix: str= 'sub-', sesprefix: str= 'ses-', dryrun: bool=False) -> None: diff --git a/tests/test_plugins.py b/tests/test_plugins.py index ab50f316..e87ecfa0 100644 --- a/tests/test_plugins.py +++ b/tests/test_plugins.py @@ -6,37 +6,12 @@ import pytest import inspect -import bidscoin.plugins as plugins -from importlib.util import find_spec -from pathlib import Path -from pydicom.data import get_testdata_file -from nibabel.testing import data_path from bidscoin import bcoin, bids, bidsmap_template bcoin.setup_logging() template = bids.BidsMap(bidsmap_template, checks=(False, False, False)) -@pytest.fixture(scope='module') -def dcm_file(): - return Path(get_testdata_file('MR_small.dcm')) - - -@pytest.fixture(scope='module') -def dcm_file_csa(): - return Path(data_path)/'1.dcm' - - -@pytest.fixture(scope='module') -def dicomdir(): - return Path(get_testdata_file('DICOMDIR')) - - -@pytest.fixture(scope='module') -def par_file(): - return Path(data_path)/'phantom_EPI_asc_CLEAR_2_1.PAR' - - # Test all plugins using the template & default options @pytest.mark.parametrize('plugin', bcoin.list_plugins()[0]) @pytest.mark.parametrize('options', [template.plugins, {}]) @@ -54,67 +29,3 @@ def test_plugin(plugin, options): module = bcoin.import_plugin(plugin, ('foo_plugin', 'bar_plugin')) if module is not None: raise ImportError(f"Unintended plugin import: '{plugin}'") - - -def test_unpack(dicomdir, tmp_path): - sessions, unpacked = plugins.unpack(dicomdir.parent, '', tmp_path, None) # None -> simulate commandline usage of dicomsort() - assert unpacked - assert len(sessions) == 6 - for session in sessions: - assert 'Doe^Archibald' in session.parts or 'Doe^Peter' in session.parts - - -def test_is_dicomfile(dcm_file): - assert plugins.is_dicomfile(dcm_file) - - -def test_is_parfile(par_file): - assert plugins.is_parfile(par_file) - - -def test_get_dicomfile(dcm_file, dicomdir): - assert plugins.get_dicomfile(dcm_file.parent).name == '693_J2KI.dcm' - assert plugins.get_dicomfile(dicomdir.parent).name == '6154' - - -def test_get_dicomfield(dcm_file_csa): - - # -> Standard DICOM - value = plugins.get_dicomfield('SeriesDescription', dcm_file_csa) - assert value == 'CBU_DTI_64D_1A' - - # -> The pydicom-style tag number - value = plugins.get_dicomfield('SeriesNumber', dcm_file_csa) - assert value == 12 - assert value == plugins.get_dicomfield('0x00200011', dcm_file_csa) - assert value == plugins.get_dicomfield('(0x20,0x11)', dcm_file_csa) - assert value == plugins.get_dicomfield('(0020,0011)', dcm_file_csa) - - # -> The special PhaseEncodingDirection tag - value = plugins.get_dicomfield('PhaseEncodingDirection', dcm_file_csa) - assert value == 'AP' - - # -> CSA Series header - value = plugins.get_dicomfield('PhaseGradientAmplitude', dcm_file_csa) - assert value == '0.0' - - # -> CSA Image header - value = plugins.get_dicomfield('ImaCoilString', dcm_file_csa) - assert value == 'T:HEA;HEP' - - value = plugins.get_dicomfield('B_matrix', dcm_file_csa) - assert value == '' - - value = plugins.get_dicomfield('NonExistingTag', dcm_file_csa) - assert value == '' - - # -> CSA MrPhoenixProtocol - if find_spec('dicom_parser'): - value = plugins.get_dicomfield('MrPhoenixProtocol.tProtocolName', dcm_file_csa) - assert value == 'CBU+AF8-DTI+AF8-64D+AF8-1A' - - value = plugins.get_dicomfield('MrPhoenixProtocol.sDiffusion', dcm_file_csa) - assert value == "{'lDiffWeightings': 2, 'alBValue': [None, 1000], 'lNoiseLevel': 40, 'lDiffDirections': 64, 'ulMode': 256}" - - value = plugins.get_dicomfield('MrPhoenixProtocol.sProtConsistencyInfo.tBaselineString', dcm_file_csa) - assert value == 'N4_VB17A_LATEST_20090307' diff --git a/tests/test_utilities.py b/tests/test_utilities.py index d2c04543..b5bc4803 100644 --- a/tests/test_utilities.py +++ b/tests/test_utilities.py @@ -1,13 +1,101 @@ +import pytest import shutil import csv from pydicom.data import get_testdata_file +from nibabel.testing import data_path from pathlib import Path from bidscoin import bcoin +from bidscoin import utilities from bidscoin.utilities import dicomsort, rawmapper, bidsparticipants +from importlib.util import find_spec bcoin.setup_logging() +@pytest.fixture(scope='module') +def dcm_file(): + return Path(get_testdata_file('MR_small.dcm')) + + +@pytest.fixture(scope='module') +def dcm_file_csa(): + return Path(data_path)/'1.dcm' + + +@pytest.fixture(scope='module') +def dicomdir(): + return Path(get_testdata_file('DICOMDIR')) + + +@pytest.fixture(scope='module') +def par_file(): + return Path(data_path)/'phantom_EPI_asc_CLEAR_2_1.PAR' + + +def test_unpack(dicomdir, tmp_path): + sessions, unpacked = utilities.unpack(dicomdir.parent, '', tmp_path, None) # None -> simulate commandline usage of dicomsort() + assert unpacked + assert len(sessions) == 6 + for session in sessions: + assert 'Doe^Archibald' in session.parts or 'Doe^Peter' in session.parts + + +def test_is_dicomfile(dcm_file): + assert utilities.is_dicomfile(dcm_file) + + +def test_is_parfile(par_file): + assert utilities.is_parfile(par_file) + + +def test_get_dicomfile(dcm_file, dicomdir): + assert utilities.get_dicomfile(dcm_file.parent).name == '693_J2KI.dcm' + assert utilities.get_dicomfile(dicomdir.parent).name == '6154' + + +def test_get_dicomfield(dcm_file_csa): + + # -> Standard DICOM + value = utilities.get_dicomfield('SeriesDescription', dcm_file_csa) + assert value == 'CBU_DTI_64D_1A' + + # -> The pydicom-style tag number + value = utilities.get_dicomfield('SeriesNumber', dcm_file_csa) + assert value == 12 + assert value == utilities.get_dicomfield('0x00200011', dcm_file_csa) + assert value == utilities.get_dicomfield('(0x20,0x11)', dcm_file_csa) + assert value == utilities.get_dicomfield('(0020,0011)', dcm_file_csa) + + # -> The special PhaseEncodingDirection tag + value = utilities.get_dicomfield('PhaseEncodingDirection', dcm_file_csa) + assert value == 'AP' + + # -> CSA Series header + value = utilities.get_dicomfield('PhaseGradientAmplitude', dcm_file_csa) + assert value == '0.0' + + # -> CSA Image header + value = utilities.get_dicomfield('ImaCoilString', dcm_file_csa) + assert value == 'T:HEA;HEP' + + value = utilities.get_dicomfield('B_matrix', dcm_file_csa) + assert value == '' + + value = utilities.get_dicomfield('NonExistingTag', dcm_file_csa) + assert value == '' + + # -> CSA MrPhoenixProtocol + if find_spec('dicom_parser'): + value = utilities.get_dicomfield('MrPhoenixProtocol.tProtocolName', dcm_file_csa) + assert value == 'CBU+AF8-DTI+AF8-64D+AF8-1A' + + value = utilities.get_dicomfield('MrPhoenixProtocol.sDiffusion', dcm_file_csa) + assert value == "{'lDiffWeightings': 2, 'alBValue': [None, 1000], 'lNoiseLevel': 40, 'lDiffDirections': 64, 'ulMode': 256}" + + value = utilities.get_dicomfield('MrPhoenixProtocol.sProtConsistencyInfo.tBaselineString', dcm_file_csa) + assert value == 'N4_VB17A_LATEST_20090307' + + def test_dicomsort(tmp_path): shutil.copytree(Path(get_testdata_file('DICOMDIR')).parent, tmp_path, dirs_exist_ok=True) session = sorted(dicomsort.sortsessions(tmp_path/'DICOMDIR', None, folderscheme='{SeriesNumber:04d}-{SeriesDescription}', namescheme='{SeriesNumber:02d}_{SeriesDescription}_{AcquisitionNumber}_{InstanceNumber}.IMA', force=True))