diff --git a/nifti2dicom/constants.py b/nifti2dicom/constants.py index 6ba2357..18a0889 100644 --- a/nifti2dicom/constants.py +++ b/nifti2dicom/constants.py @@ -20,4 +20,7 @@ ANSI_RESET = '\033[0m' +TAGS_TO_EXCLUDE = ["Pixel Data", "Image Index", "Number of Slices", "Rows", "Columns", 'Pixel Spacing', + "Image Position (Patient)", "Image Orientation (Patient)", "Instance Number", "Slice Thickness", + "Slice Location"] diff --git a/nifti2dicom/converter.py b/nifti2dicom/converter.py index 02b863d..672deb5 100644 --- a/nifti2dicom/converter.py +++ b/nifti2dicom/converter.py @@ -14,44 +14,44 @@ import os import glob -import pydicom +import json +import shutil import nibabel as nib import numpy as np from rich.progress import Progress, track from concurrent.futures import ThreadPoolExecutor -from nifti2dicom.constants import ANSI_ORANGE, ANSI_GREEN, ANSI_VIOLET, ANSI_RESET import emoji import highdicom as hd +import pydicom from pydicom.sr.codedict import codes -from pydicom.dataset import Dataset from datetime import datetime from nifti2dicom.display import display_welcome_message -import json +from nifti2dicom.constants import ANSI_ORANGE, ANSI_GREEN, ANSI_VIOLET, ANSI_RESET, TAGS_TO_EXCLUDE + -def check_directory_exists(directory_path: str) -> None: +def check_directory_exists(directory: str) -> None: """ - Checks if the specified directory exists. - - :param directory_path: The path to the directory. - :type directory_path: str + :param directory: The path to the directory. + :type directory: str :raises: Exception if the directory does not exist. """ - if not os.path.isdir(directory_path): - raise FileNotFoundError(f"Error: The directory '{directory_path}' does not exist.") + if not os.path.isdir(directory): + raise FileNotFoundError(f"Error: The directory '{directory}' does not exist.") + -def is_dicom_file(filename): +def is_dicom_file(file_path) -> bool: try: - pydicom.dcmread(filename) + pydicom.dcmread(file_path) return True except pydicom.errors.InvalidDicomError: return False -def is_dicom_compressed(dataset) -> bool: +def is_dicom_compressed(dicom_dataset) -> bool: try: - if 'PixelData' in dataset: - transfer_syntax = dataset.file_meta.TransferSyntaxUID + if 'PixelData' in dicom_dataset: + transfer_syntax = dicom_dataset.file_meta.TransferSyntaxUID uncompressed_syntaxes = [ pydicom.uid.ExplicitVRLittleEndian, pydicom.uid.ImplicitVRLittleEndian, @@ -66,26 +66,25 @@ def is_dicom_compressed(dataset) -> bool: return False -def load_reference_dicom_series(directory_path: str) -> tuple: +def load_dicom_series(directory: str) -> tuple: """ Loads a DICOM series from a directory. - - :param directory_path: The path to the directory containing the DICOM series. - :type directory_path: str + :param directory: The path to the directory containing the DICOM series. + :type directory: str :return: A tuple containing the slices and filenames of the DICOM series. :rtype: tuple """ - files = [f for f in glob.glob(os.path.join(directory_path, '*')) if is_dicom_file(f) and not os.path.basename(f).startswith('.')] + files = [f for f in glob.glob(os.path.join(directory, '*')) if is_dicom_file(f) and not os.path.basename(f).startswith('.')] slices = [pydicom.dcmread(s) for s in files] slices_and_names = sorted(zip(slices, files), key=lambda s: s[0].InstanceNumber) return zip(*slices_and_names) -def save_slice(slice_data, normalized_data, series_description, filename, output_dir, modality, vendor): +def save_slice(slice_data, normalized_data, series_description, filename, output_dir, modality, reference_header_data=None): """ Save a DICOM slice to a file. :param slice_data: DICOM slice data - :type slice_data: Dataset + :type slice_data: pydicom.dataset.Dataset :param normalized_data: Normalized data from the NIfTI image :type normalized_data: numpy.ndarray :param series_description: Description of the series @@ -96,8 +95,8 @@ def save_slice(slice_data, normalized_data, series_description, filename, output :type output_dir: str :param modality: Modality of the image (CT or PT) :type modality: str - :param vendor: Vendor from which the DICOM series was obtained (ux or sms) - :type vendor: str + :param reference_header_data: Modality of the image (CT or PT) + :type reference_header_data: pydicom.dataset.Dataset :return: None """ if is_dicom_compressed(slice_data): @@ -115,17 +114,28 @@ def save_slice(slice_data, normalized_data, series_description, filename, output slice_data.RescaleSlope = max_value / 65535 slice_data.RescaleIntercept = 0 # Reverse the rescaling to get back to the original stored values - slice_data.PixelData = (normalized_data - float(slice_data.RescaleIntercept)) / float(slice_data.RescaleSlope) + slice_data.PixelData = (normalized_data - float(slice_data.RescaleIntercept)) / float(slice_data.RescaleSlope) else: raise ValueError(f"Unknown modality: {modality}") slice_data.PixelData = slice_data.PixelData.astype(np.int16).tobytes() + + if reference_header_data is not None: + for tag in slice_data: + if tag.tag not in reference_header_data: + del slice_data[tag] + + for tag in reference_header_data: + parameter_tag_name = tag.name + if parameter_tag_name not in TAGS_TO_EXCLUDE: + slice_data[tag.tag].value = tag.value + slice_data.SeriesNumber *= 10 if slice_data.SeriesDescription: slice_data.SeriesDescription = slice_data.SeriesDescription + '_' + series_description slice_data.save_as(os.path.join(output_dir, os.path.basename(filename))) -def save_dicom_from_nifti_image(ref_dir, nifti_path, output_dir, series_description, vendor, force_overwrite=False): +def save_dicom_from_nifti_image(ref_dir, nifti_path, output_dir, vendor="ux", series_description="converted by nifti2dicom", header_dir=None, force_overwrite=False): """ Convert a NIfTI image to a DICOM series. :param ref_dir: DICOM series directory which serves as a reference for the conversion @@ -138,6 +148,8 @@ def save_dicom_from_nifti_image(ref_dir, nifti_path, output_dir, series_descript :type series_description: str :param vendor: The vendor from which the DICOM series was obtained (ux or sms) :type vendor: str + :param header_dir: The path to the header reference directory + :type header_dir: str :param force_overwrite: Force overwrite of the output directory if it already exists :type force_overwrite: bool :return: @@ -153,7 +165,7 @@ def save_dicom_from_nifti_image(ref_dir, nifti_path, output_dir, series_descript print(f' {ANSI_GREEN}* Loading NIfTI image: {nifti_path}{ANSI_RESET}') # if the vendor is sms or ux and a 3d image use the following - if vendor in ['sms', 'ux'] and num_dims == 3: + if num_dims == 3: image_data = np.flip(image_data, (1, 2)) image_data = image_data.T image_data = image_data.reshape((-1,) + image_data.shape[-2:]) @@ -171,8 +183,16 @@ def save_dicom_from_nifti_image(ref_dir, nifti_path, output_dir, series_descript else: raise ValueError(f"Unknown vendor: {vendor}") - print(f' {ANSI_GREEN}* Reference DICOM series directory: {ref_dir}{ANSI_RESET}') - dicom_slices, filenames = load_reference_dicom_series(ref_dir) + header_slice_data = None + if header_dir is not None: + print(f' {ANSI_GREEN}* Header data will be copied from: {header_dir}{ANSI_RESET}') + print(f' {ANSI_GREEN}* Spatial information will be taken from: {ref_dir}{ANSI_RESET}') + parameter_dicom_slices, _ = load_dicom_series(header_dir) + header_slice_data = parameter_dicom_slices[0] + else: + print(f' {ANSI_GREEN}* Reference DICOM series directory: {ref_dir}{ANSI_RESET}') + + dicom_slices, filenames = load_dicom_series(ref_dir) reference_slice = dicom_slices[0] if is_dicom_compressed(reference_slice): print(f' {ANSI_ORANGE}* DICOM is compressed. Will decompress to convert.{ANSI_RESET}') @@ -205,7 +225,7 @@ def save_dicom_from_nifti_image(ref_dir, nifti_path, output_dir, series_descript normalized_data = image_data[idx] futures.append( executor.submit(save_slice, slice_data, normalized_data, series_description, filename, output_dir, - modality, vendor)) + modality, header_slice_data)) for idx, future in enumerate(futures): future.result() @@ -239,7 +259,6 @@ def save_dicom_from_nifti_seg(nifti_file: str, ref_dicom_series_dir: str, output multilabel_mask = multilabel_mask.reshape((-1,) + multilabel_mask.shape[-2:]) # Generate segment descriptions based on labels in the mask - labels = np.unique(multilabel_mask)[1:] segment_descriptions = [] for label, organ_name in track(ORGAN_INDEX.items(), description="[cyan] Processing segments...", total=len(ORGAN_INDEX)): @@ -283,20 +302,22 @@ def main(): import argparse parser = argparse.ArgumentParser(description="Convert NIfTI images to DICOM format using a reference DICOM series.") - parser.add_argument("-d", "--dicom_dir", type=str, help="Path to the directory containing the reference DICOM " - "series.") - parser.add_argument("-n", "--nifti_path", type=str, help="Path to the NIfTI file to be converted.") - parser.add_argument("-o", "--output_dir", type=str, help="Path to the directory where the converted DICOM files " - "will be saved.") - parser.add_argument("-desc", "--series_description", type=str, help="Series description to be added to the DICOM header.") - parser.add_argument("-t", "--type", type=str, choices=['img','seg'], help=("Are you converting an image or a segmentation?.")) - # vendor is optional because it is not needed for the segmentation conversion - + parser.add_argument("-d", "--dicom_dir", type=str, required=True, + help="Path to the directory containing the reference DICOM series.") + parser.add_argument("-hd", "--header_source_dicom_dir", type=str, required=False, default=None, + help="Path to the directory containing the header reference DICOM series.") + parser.add_argument("-n", "--nifti_path", type=str, required=True, + help="Path to the NIfTI file to be converted.") + parser.add_argument("-o", "--output_dir", type=str, required=True, + help="Path to the directory where the converted DICOM files will be saved.") + parser.add_argument("-desc", "--series_description", required=False, default='converted by nifti2dicom', + type=str, help="Series description to be added to the DICOM header.") + parser.add_argument("-t", "--type", type=str, choices=['img', 'seg'], required=True, + help="Are you converting an image or a segmentation?") parser.add_argument("-v", "--vendor", type=str, choices=['sms', 'ux'], required=False, default='ux', - help="Vendor of the reference DICOM series.") - parser.add_argument("-j", "--json", type=str, help=f"Path to the JSON file containing the label to region index. ") - - + help="Vendor of the reference DICOM series. Only needed for 4D images.") + parser.add_argument("-j", "--json", type=str, + help=f"Path to the JSON file containing the label to region index. ") # Parse the arguments args = parser.parse_args() @@ -305,17 +326,16 @@ def main(): display_welcome_message() # Check the type of conversion - - if args.type == 'seg' and args.json: # if the type is segmentation and the json file is provided + if args.type == 'seg' and args.json: if not os.path.isdir(args.output_dir): os.makedirs(args.output_dir) with open(args.json, 'r') as f: organ_index = json.load(f) save_dicom_from_nifti_seg(args.nifti_path, args.dicom_dir, args.output_dir, organ_index) - elif args.type == 'seg' and not args.json: # if the type is segmentation and the json file is not provided + elif args.type == 'seg' and not args.json: raise ValueError(f"Please provide a JSON file containing the label to region index.") - - elif args.type == 'img': # if the type is image - save_dicom_from_nifti_image(args.dicom_dir, args.nifti_path, args.output_dir, args.series_description, args.vendor) + + elif args.type == 'img': + save_dicom_from_nifti_image(args.dicom_dir, args.nifti_path, args.output_dir, args.vendor, args.series_description, args.header_source_dicom_dir) else: raise ValueError(f"Unknown type: {args.type}") diff --git a/setup.py b/setup.py index 5524ef9..09793e7 100644 --- a/setup.py +++ b/setup.py @@ -5,7 +5,7 @@ setup( name='nifti2dicom', - version='1.1.4', + version='1.1.5', packages=find_packages(), install_requires=[ 'pydicom',