Skip to content

Commit

Permalink
Merge pull request #23 from LalithShiyam/compression-fix
Browse files Browse the repository at this point in the history
Code clean up and optional header transfer
  • Loading branch information
LalithShiyam authored Mar 13, 2024
2 parents 2bedccc + b345773 commit d512fcf
Show file tree
Hide file tree
Showing 3 changed files with 75 additions and 52 deletions.
3 changes: 3 additions & 0 deletions nifti2dicom/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,7 @@
ANSI_RESET = '\033[0m'


TAGS_TO_EXCLUDE = ["Pixel Data", "Image Index", "Number of Slices", "Rows", "Columns", 'Pixel Spacing',
"Image Position (Patient)", "Image Orientation (Patient)", "Instance Number", "Slice Thickness",
"Slice Location"]

122 changes: 71 additions & 51 deletions nifti2dicom/converter.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,44 +14,44 @@

import os
import glob
import pydicom
import json
import shutil
import nibabel as nib
import numpy as np
from rich.progress import Progress, track
from concurrent.futures import ThreadPoolExecutor
from nifti2dicom.constants import ANSI_ORANGE, ANSI_GREEN, ANSI_VIOLET, ANSI_RESET
import emoji
import highdicom as hd
import pydicom
from pydicom.sr.codedict import codes
from pydicom.dataset import Dataset
from datetime import datetime
from nifti2dicom.display import display_welcome_message
import json
from nifti2dicom.constants import ANSI_ORANGE, ANSI_GREEN, ANSI_VIOLET, ANSI_RESET, TAGS_TO_EXCLUDE


def check_directory_exists(directory_path: str) -> None:
def check_directory_exists(directory: str) -> None:
"""
Checks if the specified directory exists.
:param directory_path: The path to the directory.
:type directory_path: str
:param directory: The path to the directory.
:type directory: str
:raises: Exception if the directory does not exist.
"""
if not os.path.isdir(directory_path):
raise FileNotFoundError(f"Error: The directory '{directory_path}' does not exist.")
if not os.path.isdir(directory):
raise FileNotFoundError(f"Error: The directory '{directory}' does not exist.")


def is_dicom_file(filename):
def is_dicom_file(file_path) -> bool:
try:
pydicom.dcmread(filename)
pydicom.dcmread(file_path)
return True
except pydicom.errors.InvalidDicomError:
return False


def is_dicom_compressed(dataset) -> bool:
def is_dicom_compressed(dicom_dataset) -> bool:
try:
if 'PixelData' in dataset:
transfer_syntax = dataset.file_meta.TransferSyntaxUID
if 'PixelData' in dicom_dataset:
transfer_syntax = dicom_dataset.file_meta.TransferSyntaxUID
uncompressed_syntaxes = [
pydicom.uid.ExplicitVRLittleEndian,
pydicom.uid.ImplicitVRLittleEndian,
Expand All @@ -66,26 +66,25 @@ def is_dicom_compressed(dataset) -> bool:
return False


def load_reference_dicom_series(directory_path: str) -> tuple:
def load_dicom_series(directory: str) -> tuple:
"""
Loads a DICOM series from a directory.
:param directory_path: The path to the directory containing the DICOM series.
:type directory_path: str
:param directory: The path to the directory containing the DICOM series.
:type directory: str
:return: A tuple containing the slices and filenames of the DICOM series.
:rtype: tuple
"""
files = [f for f in glob.glob(os.path.join(directory_path, '*')) if is_dicom_file(f) and not os.path.basename(f).startswith('.')]
files = [f for f in glob.glob(os.path.join(directory, '*')) if is_dicom_file(f) and not os.path.basename(f).startswith('.')]
slices = [pydicom.dcmread(s) for s in files]
slices_and_names = sorted(zip(slices, files), key=lambda s: s[0].InstanceNumber)
return zip(*slices_and_names)


def save_slice(slice_data, normalized_data, series_description, filename, output_dir, modality, vendor):
def save_slice(slice_data, normalized_data, series_description, filename, output_dir, modality, reference_header_data=None):
"""
Save a DICOM slice to a file.
:param slice_data: DICOM slice data
:type slice_data: Dataset
:type slice_data: pydicom.dataset.Dataset
:param normalized_data: Normalized data from the NIfTI image
:type normalized_data: numpy.ndarray
:param series_description: Description of the series
Expand All @@ -96,8 +95,8 @@ def save_slice(slice_data, normalized_data, series_description, filename, output
:type output_dir: str
:param modality: Modality of the image (CT or PT)
:type modality: str
:param vendor: Vendor from which the DICOM series was obtained (ux or sms)
:type vendor: str
:param reference_header_data: Modality of the image (CT or PT)
:type reference_header_data: pydicom.dataset.Dataset
:return: None
"""
if is_dicom_compressed(slice_data):
Expand All @@ -115,17 +114,28 @@ def save_slice(slice_data, normalized_data, series_description, filename, output
slice_data.RescaleSlope = max_value / 65535
slice_data.RescaleIntercept = 0
# Reverse the rescaling to get back to the original stored values
slice_data.PixelData = (normalized_data - float(slice_data.RescaleIntercept)) / float(slice_data.RescaleSlope)
slice_data.PixelData = (normalized_data - float(slice_data.RescaleIntercept)) / float(slice_data.RescaleSlope)
else:
raise ValueError(f"Unknown modality: {modality}")
slice_data.PixelData = slice_data.PixelData.astype(np.int16).tobytes()

if reference_header_data is not None:
for tag in slice_data:
if tag.tag not in reference_header_data:
del slice_data[tag]

for tag in reference_header_data:
parameter_tag_name = tag.name
if parameter_tag_name not in TAGS_TO_EXCLUDE:
slice_data[tag.tag].value = tag.value

slice_data.SeriesNumber *= 10
if slice_data.SeriesDescription:
slice_data.SeriesDescription = slice_data.SeriesDescription + '_' + series_description
slice_data.save_as(os.path.join(output_dir, os.path.basename(filename)))


def save_dicom_from_nifti_image(ref_dir, nifti_path, output_dir, series_description, vendor, force_overwrite=False):
def save_dicom_from_nifti_image(ref_dir, nifti_path, output_dir, vendor="ux", series_description="converted by nifti2dicom", header_dir=None, force_overwrite=False):
"""
Convert a NIfTI image to a DICOM series.
:param ref_dir: DICOM series directory which serves as a reference for the conversion
Expand All @@ -138,6 +148,8 @@ def save_dicom_from_nifti_image(ref_dir, nifti_path, output_dir, series_descript
:type series_description: str
:param vendor: The vendor from which the DICOM series was obtained (ux or sms)
:type vendor: str
:param header_dir: The path to the header reference directory
:type header_dir: str
:param force_overwrite: Force overwrite of the output directory if it already exists
:type force_overwrite: bool
:return:
Expand All @@ -153,7 +165,7 @@ def save_dicom_from_nifti_image(ref_dir, nifti_path, output_dir, series_descript
print(f' {ANSI_GREEN}* Loading NIfTI image: {nifti_path}{ANSI_RESET}')

# if the vendor is sms or ux and a 3d image use the following
if vendor in ['sms', 'ux'] and num_dims == 3:
if num_dims == 3:
image_data = np.flip(image_data, (1, 2))
image_data = image_data.T
image_data = image_data.reshape((-1,) + image_data.shape[-2:])
Expand All @@ -171,8 +183,16 @@ def save_dicom_from_nifti_image(ref_dir, nifti_path, output_dir, series_descript
else:
raise ValueError(f"Unknown vendor: {vendor}")

print(f' {ANSI_GREEN}* Reference DICOM series directory: {ref_dir}{ANSI_RESET}')
dicom_slices, filenames = load_reference_dicom_series(ref_dir)
header_slice_data = None
if header_dir is not None:
print(f' {ANSI_GREEN}* Header data will be copied from: {header_dir}{ANSI_RESET}')
print(f' {ANSI_GREEN}* Spatial information will be taken from: {ref_dir}{ANSI_RESET}')
parameter_dicom_slices, _ = load_dicom_series(header_dir)
header_slice_data = parameter_dicom_slices[0]
else:
print(f' {ANSI_GREEN}* Reference DICOM series directory: {ref_dir}{ANSI_RESET}')

dicom_slices, filenames = load_dicom_series(ref_dir)
reference_slice = dicom_slices[0]
if is_dicom_compressed(reference_slice):
print(f' {ANSI_ORANGE}* DICOM is compressed. Will decompress to convert.{ANSI_RESET}')
Expand Down Expand Up @@ -205,7 +225,7 @@ def save_dicom_from_nifti_image(ref_dir, nifti_path, output_dir, series_descript
normalized_data = image_data[idx]
futures.append(
executor.submit(save_slice, slice_data, normalized_data, series_description, filename, output_dir,
modality, vendor))
modality, header_slice_data))

for idx, future in enumerate(futures):
future.result()
Expand Down Expand Up @@ -239,7 +259,6 @@ def save_dicom_from_nifti_seg(nifti_file: str, ref_dicom_series_dir: str, output
multilabel_mask = multilabel_mask.reshape((-1,) + multilabel_mask.shape[-2:])

# Generate segment descriptions based on labels in the mask
labels = np.unique(multilabel_mask)[1:]
segment_descriptions = []
for label, organ_name in track(ORGAN_INDEX.items(), description="[cyan] Processing segments...",
total=len(ORGAN_INDEX)):
Expand Down Expand Up @@ -283,20 +302,22 @@ def main():
import argparse

parser = argparse.ArgumentParser(description="Convert NIfTI images to DICOM format using a reference DICOM series.")
parser.add_argument("-d", "--dicom_dir", type=str, help="Path to the directory containing the reference DICOM "
"series.")
parser.add_argument("-n", "--nifti_path", type=str, help="Path to the NIfTI file to be converted.")
parser.add_argument("-o", "--output_dir", type=str, help="Path to the directory where the converted DICOM files "
"will be saved.")
parser.add_argument("-desc", "--series_description", type=str, help="Series description to be added to the DICOM header.")
parser.add_argument("-t", "--type", type=str, choices=['img','seg'], help=("Are you converting an image or a segmentation?."))
# vendor is optional because it is not needed for the segmentation conversion

parser.add_argument("-d", "--dicom_dir", type=str, required=True,
help="Path to the directory containing the reference DICOM series.")
parser.add_argument("-hd", "--header_source_dicom_dir", type=str, required=False, default=None,
help="Path to the directory containing the header reference DICOM series.")
parser.add_argument("-n", "--nifti_path", type=str, required=True,
help="Path to the NIfTI file to be converted.")
parser.add_argument("-o", "--output_dir", type=str, required=True,
help="Path to the directory where the converted DICOM files will be saved.")
parser.add_argument("-desc", "--series_description", required=False, default='converted by nifti2dicom',
type=str, help="Series description to be added to the DICOM header.")
parser.add_argument("-t", "--type", type=str, choices=['img', 'seg'], required=True,
help="Are you converting an image or a segmentation?")
parser.add_argument("-v", "--vendor", type=str, choices=['sms', 'ux'], required=False, default='ux',
help="Vendor of the reference DICOM series.")
parser.add_argument("-j", "--json", type=str, help=f"Path to the JSON file containing the label to region index. ")


help="Vendor of the reference DICOM series. Only needed for 4D images.")
parser.add_argument("-j", "--json", type=str,
help=f"Path to the JSON file containing the label to region index. ")

# Parse the arguments
args = parser.parse_args()
Expand All @@ -305,17 +326,16 @@ def main():
display_welcome_message()

# Check the type of conversion

if args.type == 'seg' and args.json: # if the type is segmentation and the json file is provided
if args.type == 'seg' and args.json:
if not os.path.isdir(args.output_dir):
os.makedirs(args.output_dir)
with open(args.json, 'r') as f:
organ_index = json.load(f)
save_dicom_from_nifti_seg(args.nifti_path, args.dicom_dir, args.output_dir, organ_index)
elif args.type == 'seg' and not args.json: # if the type is segmentation and the json file is not provided
elif args.type == 'seg' and not args.json:
raise ValueError(f"Please provide a JSON file containing the label to region index.")
elif args.type == 'img': # if the type is image
save_dicom_from_nifti_image(args.dicom_dir, args.nifti_path, args.output_dir, args.series_description, args.vendor)

elif args.type == 'img':
save_dicom_from_nifti_image(args.dicom_dir, args.nifti_path, args.output_dir, args.vendor, args.series_description, args.header_source_dicom_dir)
else:
raise ValueError(f"Unknown type: {args.type}")
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

setup(
name='nifti2dicom',
version='1.1.4',
version='1.1.5',
packages=find_packages(),
install_requires=[
'pydicom',
Expand Down

0 comments on commit d512fcf

Please sign in to comment.