From b345773420c4c400be710268b1b10b53c7509260 Mon Sep 17 00:00:00 2001 From: Keyn34 Date: Wed, 13 Mar 2024 18:48:11 +0100 Subject: [PATCH] Code clean up converter.py - cleaned up code and improved readability. This encompasses formatting fixes and removing unneeded code segments such as the determination of labels in save_dicom_from_nifti_seg() and and the unused attribute vendor in save_slice(). The vendor check for 3D images was also removed as this always evaluated to True in the implementation and was not determinsitic for the behaviour of the code. - improved variable and attribute names - renamed load_reference_dicom_series() to load_dicom_series() - added an optional parameter to save_slice() and save_dicom_from_nifti_image() to copy header information from a second dicom series to the header of the reference series. This is useful when processing on the nifti image was done and the reference header contains the spatial information, wheres the second header contains the actual study information. setup.py - raised version to 1.1.5 constants.py - added TAGS_TO_EXCLUDE to constants.py. These tags will not be copied to the spatial reference header. --- nifti2dicom/constants.py | 3 + nifti2dicom/converter.py | 122 +++++++++++++++++++++++---------------- setup.py | 2 +- 3 files changed, 75 insertions(+), 52 deletions(-) diff --git a/nifti2dicom/constants.py b/nifti2dicom/constants.py index 6ba2357..18a0889 100644 --- a/nifti2dicom/constants.py +++ b/nifti2dicom/constants.py @@ -20,4 +20,7 @@ ANSI_RESET = '\033[0m' +TAGS_TO_EXCLUDE = ["Pixel Data", "Image Index", "Number of Slices", "Rows", "Columns", 'Pixel Spacing', + "Image Position (Patient)", "Image Orientation (Patient)", "Instance Number", "Slice Thickness", + "Slice Location"] diff --git a/nifti2dicom/converter.py b/nifti2dicom/converter.py index 02b863d..672deb5 100644 --- a/nifti2dicom/converter.py +++ b/nifti2dicom/converter.py @@ -14,44 +14,44 @@ import os import glob -import pydicom +import json +import shutil import nibabel as nib import numpy as np from rich.progress import Progress, track from concurrent.futures import ThreadPoolExecutor -from nifti2dicom.constants import ANSI_ORANGE, ANSI_GREEN, ANSI_VIOLET, ANSI_RESET import emoji import highdicom as hd +import pydicom from pydicom.sr.codedict import codes -from pydicom.dataset import Dataset from datetime import datetime from nifti2dicom.display import display_welcome_message -import json +from nifti2dicom.constants import ANSI_ORANGE, ANSI_GREEN, ANSI_VIOLET, ANSI_RESET, TAGS_TO_EXCLUDE + -def check_directory_exists(directory_path: str) -> None: +def check_directory_exists(directory: str) -> None: """ - Checks if the specified directory exists. - - :param directory_path: The path to the directory. - :type directory_path: str + :param directory: The path to the directory. + :type directory: str :raises: Exception if the directory does not exist. """ - if not os.path.isdir(directory_path): - raise FileNotFoundError(f"Error: The directory '{directory_path}' does not exist.") + if not os.path.isdir(directory): + raise FileNotFoundError(f"Error: The directory '{directory}' does not exist.") + -def is_dicom_file(filename): +def is_dicom_file(file_path) -> bool: try: - pydicom.dcmread(filename) + pydicom.dcmread(file_path) return True except pydicom.errors.InvalidDicomError: return False -def is_dicom_compressed(dataset) -> bool: +def is_dicom_compressed(dicom_dataset) -> bool: try: - if 'PixelData' in dataset: - transfer_syntax = dataset.file_meta.TransferSyntaxUID + if 'PixelData' in dicom_dataset: + transfer_syntax = dicom_dataset.file_meta.TransferSyntaxUID uncompressed_syntaxes = [ pydicom.uid.ExplicitVRLittleEndian, pydicom.uid.ImplicitVRLittleEndian, @@ -66,26 +66,25 @@ def is_dicom_compressed(dataset) -> bool: return False -def load_reference_dicom_series(directory_path: str) -> tuple: +def load_dicom_series(directory: str) -> tuple: """ Loads a DICOM series from a directory. - - :param directory_path: The path to the directory containing the DICOM series. - :type directory_path: str + :param directory: The path to the directory containing the DICOM series. + :type directory: str :return: A tuple containing the slices and filenames of the DICOM series. :rtype: tuple """ - files = [f for f in glob.glob(os.path.join(directory_path, '*')) if is_dicom_file(f) and not os.path.basename(f).startswith('.')] + files = [f for f in glob.glob(os.path.join(directory, '*')) if is_dicom_file(f) and not os.path.basename(f).startswith('.')] slices = [pydicom.dcmread(s) for s in files] slices_and_names = sorted(zip(slices, files), key=lambda s: s[0].InstanceNumber) return zip(*slices_and_names) -def save_slice(slice_data, normalized_data, series_description, filename, output_dir, modality, vendor): +def save_slice(slice_data, normalized_data, series_description, filename, output_dir, modality, reference_header_data=None): """ Save a DICOM slice to a file. :param slice_data: DICOM slice data - :type slice_data: Dataset + :type slice_data: pydicom.dataset.Dataset :param normalized_data: Normalized data from the NIfTI image :type normalized_data: numpy.ndarray :param series_description: Description of the series @@ -96,8 +95,8 @@ def save_slice(slice_data, normalized_data, series_description, filename, output :type output_dir: str :param modality: Modality of the image (CT or PT) :type modality: str - :param vendor: Vendor from which the DICOM series was obtained (ux or sms) - :type vendor: str + :param reference_header_data: Modality of the image (CT or PT) + :type reference_header_data: pydicom.dataset.Dataset :return: None """ if is_dicom_compressed(slice_data): @@ -115,17 +114,28 @@ def save_slice(slice_data, normalized_data, series_description, filename, output slice_data.RescaleSlope = max_value / 65535 slice_data.RescaleIntercept = 0 # Reverse the rescaling to get back to the original stored values - slice_data.PixelData = (normalized_data - float(slice_data.RescaleIntercept)) / float(slice_data.RescaleSlope) + slice_data.PixelData = (normalized_data - float(slice_data.RescaleIntercept)) / float(slice_data.RescaleSlope) else: raise ValueError(f"Unknown modality: {modality}") slice_data.PixelData = slice_data.PixelData.astype(np.int16).tobytes() + + if reference_header_data is not None: + for tag in slice_data: + if tag.tag not in reference_header_data: + del slice_data[tag] + + for tag in reference_header_data: + parameter_tag_name = tag.name + if parameter_tag_name not in TAGS_TO_EXCLUDE: + slice_data[tag.tag].value = tag.value + slice_data.SeriesNumber *= 10 if slice_data.SeriesDescription: slice_data.SeriesDescription = slice_data.SeriesDescription + '_' + series_description slice_data.save_as(os.path.join(output_dir, os.path.basename(filename))) -def save_dicom_from_nifti_image(ref_dir, nifti_path, output_dir, series_description, vendor, force_overwrite=False): +def save_dicom_from_nifti_image(ref_dir, nifti_path, output_dir, vendor="ux", series_description="converted by nifti2dicom", header_dir=None, force_overwrite=False): """ Convert a NIfTI image to a DICOM series. :param ref_dir: DICOM series directory which serves as a reference for the conversion @@ -138,6 +148,8 @@ def save_dicom_from_nifti_image(ref_dir, nifti_path, output_dir, series_descript :type series_description: str :param vendor: The vendor from which the DICOM series was obtained (ux or sms) :type vendor: str + :param header_dir: The path to the header reference directory + :type header_dir: str :param force_overwrite: Force overwrite of the output directory if it already exists :type force_overwrite: bool :return: @@ -153,7 +165,7 @@ def save_dicom_from_nifti_image(ref_dir, nifti_path, output_dir, series_descript print(f' {ANSI_GREEN}* Loading NIfTI image: {nifti_path}{ANSI_RESET}') # if the vendor is sms or ux and a 3d image use the following - if vendor in ['sms', 'ux'] and num_dims == 3: + if num_dims == 3: image_data = np.flip(image_data, (1, 2)) image_data = image_data.T image_data = image_data.reshape((-1,) + image_data.shape[-2:]) @@ -171,8 +183,16 @@ def save_dicom_from_nifti_image(ref_dir, nifti_path, output_dir, series_descript else: raise ValueError(f"Unknown vendor: {vendor}") - print(f' {ANSI_GREEN}* Reference DICOM series directory: {ref_dir}{ANSI_RESET}') - dicom_slices, filenames = load_reference_dicom_series(ref_dir) + header_slice_data = None + if header_dir is not None: + print(f' {ANSI_GREEN}* Header data will be copied from: {header_dir}{ANSI_RESET}') + print(f' {ANSI_GREEN}* Spatial information will be taken from: {ref_dir}{ANSI_RESET}') + parameter_dicom_slices, _ = load_dicom_series(header_dir) + header_slice_data = parameter_dicom_slices[0] + else: + print(f' {ANSI_GREEN}* Reference DICOM series directory: {ref_dir}{ANSI_RESET}') + + dicom_slices, filenames = load_dicom_series(ref_dir) reference_slice = dicom_slices[0] if is_dicom_compressed(reference_slice): print(f' {ANSI_ORANGE}* DICOM is compressed. Will decompress to convert.{ANSI_RESET}') @@ -205,7 +225,7 @@ def save_dicom_from_nifti_image(ref_dir, nifti_path, output_dir, series_descript normalized_data = image_data[idx] futures.append( executor.submit(save_slice, slice_data, normalized_data, series_description, filename, output_dir, - modality, vendor)) + modality, header_slice_data)) for idx, future in enumerate(futures): future.result() @@ -239,7 +259,6 @@ def save_dicom_from_nifti_seg(nifti_file: str, ref_dicom_series_dir: str, output multilabel_mask = multilabel_mask.reshape((-1,) + multilabel_mask.shape[-2:]) # Generate segment descriptions based on labels in the mask - labels = np.unique(multilabel_mask)[1:] segment_descriptions = [] for label, organ_name in track(ORGAN_INDEX.items(), description="[cyan] Processing segments...", total=len(ORGAN_INDEX)): @@ -283,20 +302,22 @@ def main(): import argparse parser = argparse.ArgumentParser(description="Convert NIfTI images to DICOM format using a reference DICOM series.") - parser.add_argument("-d", "--dicom_dir", type=str, help="Path to the directory containing the reference DICOM " - "series.") - parser.add_argument("-n", "--nifti_path", type=str, help="Path to the NIfTI file to be converted.") - parser.add_argument("-o", "--output_dir", type=str, help="Path to the directory where the converted DICOM files " - "will be saved.") - parser.add_argument("-desc", "--series_description", type=str, help="Series description to be added to the DICOM header.") - parser.add_argument("-t", "--type", type=str, choices=['img','seg'], help=("Are you converting an image or a segmentation?.")) - # vendor is optional because it is not needed for the segmentation conversion - + parser.add_argument("-d", "--dicom_dir", type=str, required=True, + help="Path to the directory containing the reference DICOM series.") + parser.add_argument("-hd", "--header_source_dicom_dir", type=str, required=False, default=None, + help="Path to the directory containing the header reference DICOM series.") + parser.add_argument("-n", "--nifti_path", type=str, required=True, + help="Path to the NIfTI file to be converted.") + parser.add_argument("-o", "--output_dir", type=str, required=True, + help="Path to the directory where the converted DICOM files will be saved.") + parser.add_argument("-desc", "--series_description", required=False, default='converted by nifti2dicom', + type=str, help="Series description to be added to the DICOM header.") + parser.add_argument("-t", "--type", type=str, choices=['img', 'seg'], required=True, + help="Are you converting an image or a segmentation?") parser.add_argument("-v", "--vendor", type=str, choices=['sms', 'ux'], required=False, default='ux', - help="Vendor of the reference DICOM series.") - parser.add_argument("-j", "--json", type=str, help=f"Path to the JSON file containing the label to region index. ") - - + help="Vendor of the reference DICOM series. Only needed for 4D images.") + parser.add_argument("-j", "--json", type=str, + help=f"Path to the JSON file containing the label to region index. ") # Parse the arguments args = parser.parse_args() @@ -305,17 +326,16 @@ def main(): display_welcome_message() # Check the type of conversion - - if args.type == 'seg' and args.json: # if the type is segmentation and the json file is provided + if args.type == 'seg' and args.json: if not os.path.isdir(args.output_dir): os.makedirs(args.output_dir) with open(args.json, 'r') as f: organ_index = json.load(f) save_dicom_from_nifti_seg(args.nifti_path, args.dicom_dir, args.output_dir, organ_index) - elif args.type == 'seg' and not args.json: # if the type is segmentation and the json file is not provided + elif args.type == 'seg' and not args.json: raise ValueError(f"Please provide a JSON file containing the label to region index.") - - elif args.type == 'img': # if the type is image - save_dicom_from_nifti_image(args.dicom_dir, args.nifti_path, args.output_dir, args.series_description, args.vendor) + + elif args.type == 'img': + save_dicom_from_nifti_image(args.dicom_dir, args.nifti_path, args.output_dir, args.vendor, args.series_description, args.header_source_dicom_dir) else: raise ValueError(f"Unknown type: {args.type}") diff --git a/setup.py b/setup.py index 5524ef9..09793e7 100644 --- a/setup.py +++ b/setup.py @@ -5,7 +5,7 @@ setup( name='nifti2dicom', - version='1.1.4', + version='1.1.5', packages=find_packages(), install_requires=[ 'pydicom',