Skip to content

Commit

Permalink
Merge pull request #1 from Epiconcept-Paris/feature/remove-dp-home
Browse files Browse the repository at this point in the history
Feature/remove dp home
  • Loading branch information
williammadie authored Jun 21, 2024
2 parents a7ed8e9 + 934da51 commit 50c0d06
Show file tree
Hide file tree
Showing 11 changed files with 258 additions and 151 deletions.
111 changes: 111 additions & 0 deletions deidcm/config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
"""Class used to define environment configuration."""

import json
import os
import warnings
from typing_extensions import Self


class Config:
"""This class is used to change the configuration of your environment.
This singleton object has to be instanciated for deidentification tasks.
It allows you to define the path to a custom recipe and the path to
a authorized_words.txt file.
- `recipe.json`: a JSON file that contains the recipe orchestrating the
attribute deidentification process.
- `authorized_words.txt`: a TXT file that contains one word per line. Each
word will be kept on the image even if it is detected by the OCR reader.
"""
_instance = None
_recipe = None
_authorized_words = []

def __new__(cls, recipe_path: str = None, authorized_words_path: str = None) -> Self:
"""
Create a new instance of Config if it does not exist.
Returns:
Config: The single instance of the Config class.
"""
if cls._instance is None:
cls._instance = super(Config, cls).__new__(cls)

# Init recipe
cls._recipe = cls.load_recipe(recipe_path)

# Init authorized_words
if authorized_words_path is None:
print(
"No authorized_words.txt file given. All OCR detected words will be erased.")
else:
cls._authorized_words = cls.load_authorized_words(
authorized_words_path)

return cls._instance

@classmethod
def load_authorized_words(cls, authorized_words_filepath: str) -> list:
"""Get and load the list of authorized words from authorized_words.json
This function reads `authorized_words.txt` and load it into a python list.
If the file is not defined, the deidentification process will erase
all detected words.
Returns:
A Python list of authorized words
"""
if not os.path.exists(authorized_words_filepath):
raise FileNotFoundError(
f'Cannot load {authorized_words_filepath}')
with open(authorized_words_filepath, 'r', encoding="utf8") as f:
words = list(map(str.strip, f.readlines()))
return words

@classmethod
def load_recipe(cls, recipe_filepath: str) -> dict:
"""Get the recipe from recipe.json and load it into a python dict.
This function reads `recipe.json`. If a user-defined version of the file
is detected, it will be used. Otherwise, the inbuilt version of the file will be used.
Be aware that the inbuilt version of the file does not suit a generic usage.
It was created for the Deep.piste study. It is highly recommended to create
your own version of `recipe.json`.
Returns:
A Python dictionary with recipe elements.
"""
recipe = None

# Load user customized recipe.json file
if recipe_filepath is None or not os.path.exists(recipe_filepath):
print(
f"No customized recipe.json found at path `{recipe_filepath}`. Defaulting to package inbuilt recipe.json")
else:
recipe = recipe_filepath

# Load default inbuilt recipe.json file
if recipe is None:
recipe = os.path.join(os.path.dirname(
__file__), 'dicom', 'recipe.json')

try:
with open(recipe, 'r', encoding="utf8") as f:
return json.load(f)
except FileNotFoundError as exc:
raise FileNotFoundError(
f"Recipe file {recipe} cannot be found.") from exc

@property
def recipe(self) -> dict:
"""Getter of recipe"""
if self._recipe is None:
raise RuntimeError("Recipe has not been initialized")
return self._recipe

@property
def authorized_words(self) -> list:
"""Getter of authorized_words"""
return self._authorized_words
73 changes: 7 additions & 66 deletions deidcm/dicom/deid_mammogram.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
import re
import os
import uuid
import json
import base64
import string
import hashlib
Expand All @@ -19,15 +18,15 @@
from typing import List
from datetime import datetime
from datetime import timedelta
from functools import lru_cache

import pydicom
from pydicom import Dataset
import numpy as np
import pandas as pd

from PIL import Image, ImageDraw, ImageFilter
from easyocr import Reader

from deidcm.config import Config
from deidcm.dicom.dicom2df import dicom2df
from deidcm.dicom.utils import log

Expand Down Expand Up @@ -175,11 +174,6 @@ def get_text_areas(pixels: np.ndarray, languages: list = ['fr']) -> list:
try:
if ocr_data[0][2] > 0.3:
return remove_authorized_words_from(ocr_data)
except ValueError:
warnings.warn(
"Cannot load authorized words file. To suppress this warning, \
please create an empty ocr_deid_ignore.txt", RuntimeWarning)
return ocr_data
# If ocr_data is empty, trying to access for
# checking level of confidence will raise IndexError
except IndexError:
Expand All @@ -199,33 +193,19 @@ def remove_authorized_words_from(ocr_data: list) -> list:
Returns:
The same list of words and coordinates minus the authorized words elements.
"""
authorized_words = load_authorized_words()
config = Config()
if ocr_data is None:
filtered_ocr_data = ocr_data
else:
filtered_ocr_data = []
for data in ocr_data:
if data[1].upper() in authorized_words:
if data[1].upper() in config.authorized_words:
log(f'Ignoring word {data[1].upper()}')
else:
filtered_ocr_data.append(data)
return filtered_ocr_data


@lru_cache(maxsize=1)
def load_authorized_words() -> list:
home_folder = os.environ.get('DP_HOME')
if home_folder is None:
raise ValueError('cannot load DP_HOME')
filepath = os.path.join(home_folder, 'data', 'input',
'epiconcept', 'ocr_deid_ignore.txt')
if not os.path.exists(filepath):
raise FileNotFoundError(f'Cannot load {filepath}')
with open(filepath, 'r', encoding="utf8") as f:
words = list(map(str.strip, f.readlines()))
return words


def hide_text(pixels: np.ndarray, ocr_data: list, color_value: str = "black", mode: str = "rectangle", margin=300) -> np.ndarray:
"""Censor text present on the pixels array representing an image.
Expand Down Expand Up @@ -309,69 +289,30 @@ def deidentify_attributes(indir: str, outdir: str, org_root: str, erase_outdir:
Returns:
A Pandas dataframe containing all metadata/attributes information.
"""
if False in list(map(lambda x: os.path.exists(x), [indir, outdir])):
if False in list(map(os.path.exists, [indir, outdir])):
raise ValueError(f"Path {indir} or {outdir} does not exist.")

if erase_outdir:
for file in os.listdir(outdir):
os.remove(os.path.join(outdir, file))

df = dicom2df(indir)
recipe = load_recipe()

config = Config()
for file in df.index:
for attribute in df.columns:
value = df[attribute][file]
if attribute != 'FilePath':
df.loc[file, attribute] = apply_deidentification(
attribute,
value,
recipe,
config.recipe,
org_root
)
df['PatientIdentityRemoved_0x00120062_CS_1____'] = 'YES'
return df


@lru_cache(maxsize=1)
def load_recipe() -> dict:
"""Get the recipe from recipe.json and load it into a python dict.
This function reads `recipe.json`. If a user-defined version of the file
is detected inside `$DP_HOME/data/input`, it will be used. Otherwise, the
inbuilt version of the file will be used.
Be aware that the inbuilt version of the file does not suit a generic usage.
It was created for the Deep.piste study. It is highly recommended to create
your own version of `recipe.json`.
Returns:
A Python dictionary with recipe elements.
"""
recipe = None

# Loads user customized recipe.json file
home_folder = os.environ.get('DP_HOME')
if home_folder is None:
raise ValueError('cannot load DP_HOME')
filepath = os.path.join(home_folder, 'data', 'input', 'recipe.json')
if not os.path.exists(filepath):
log(
f"WARNING: No customized recipe.json found at {filepath}. Defaulting to package inbuilt recipe.json")
else:
recipe = filepath

# Loads default inbuilt recipe.json file
if recipe is None:
recipe = os.path.join(os.path.dirname(__file__), 'recipe.json')

try:
with open(recipe, 'r') as f:
return json.load(f)
except FileNotFoundError:
raise ValueError(f"Recipe file {recipe} cannot be found.")


def get_id(id_attribute):
"""reformats the id stored as a string 0xYYYYZZZZ to a tuple"""
y_id = '0x' + id_attribute[6:len(id_attribute)]
Expand Down
31 changes: 8 additions & 23 deletions deidcm/dicom/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@
import pydicom
from PIL import Image

def write_all_ds(indir: str, outdir: str, silent: bool=False) -> None:

def write_all_ds(indir: str, outdir: str, silent: bool = False) -> None:
"""Writes the ds of all the dicom in a folder"""
nb_files = len(os.listdir(indir))
counter = 0
Expand All @@ -19,7 +20,7 @@ def write_all_ds(indir: str, outdir: str, silent: bool=False) -> None:
def write1ds(file: str, outdir: str) -> None:
"""Writes the ds of a given dicom file"""
ds = pydicom.dcmread(file)
with open(os.path.join(outdir, f"{os.path.basename(file)[:-4]}.txt"),'w') as f:
with open(os.path.join(outdir, f"{os.path.basename(file)[:-4]}.txt"), 'w') as f:
f.write(str(ds))


Expand Down Expand Up @@ -47,7 +48,7 @@ def show_series(indir: str, tag: str) -> None:
files[element.value].append(file)
except Exception:
files[element.value] = [file]

for key, value in dict(Counter(series)).items():
print(f"{key} appears {value} time(s)")
if value == 1:
Expand All @@ -66,7 +67,7 @@ def d() -> str:
return f'{now}'


def log(txt: Union[str, list], logtype: int=0) -> None:
def log(txt: Union[str, list], logtype: int = 0) -> None:
if logtype == 1:
logtype = ' (WARNING) '
elif logtype == 2:
Expand All @@ -76,7 +77,7 @@ def log(txt: Union[str, list], logtype: int=0) -> None:
if type(txt) == str:
print(f'{d()}{logtype}{txt}')
else:
f = lambda x: print(f'{d()}{logtype}{x}')
def f(x): return print(f'{d()}{logtype}{x}')
list(map(f, txt))
sys.stdout.flush()

Expand All @@ -85,23 +86,7 @@ def reduce_PIL_img_size(im: Image, reduce_factor: int, verbose: bool) -> Image:
"""Reduce the size of an image by dividing with the given factor"""
width, height = im.size
print(f"Size before reducing: {im.size}") if verbose else None
im.thumbnail((width/reduce_factor, height/reduce_factor), Image.ANTIALIAS)
im.thumbnail((width/reduce_factor, height/reduce_factor),
Image.Resampling.LANCZOS)
print(f"Size after reducing: {im.size}") if verbose else None
return im


def get_all_mammograms_words(dicom_path: str, report_path: str) -> None:
for file in os.listdir(dicom_path):
words = []
filepath = os.path.join(dicom_path, file)
pixels, ds = dicom2narray(filepath)
ocr_data = get_text_areas(pixels)
if ocr_data is None:
continue
words.extend([data[1] for data in ocr_data])
with open(report_path, 'a') as f:
list(map(lambda x: f.write(f'{x}\n'), words))


if __name__ == '__main__':
print(get_all_mammograms_words('data/input/test_deid_ocr', 'data/input/mammogram_words'))
Loading

0 comments on commit 50c0d06

Please sign in to comment.