Skip to content

Commit

Permalink
Initial implementation ome-tiff field validation
Browse files Browse the repository at this point in the history
  • Loading branch information
jswelling committed Oct 4, 2024
1 parent 2d69c56 commit 7d65022
Show file tree
Hide file tree
Showing 3 changed files with 238 additions and 0 deletions.
134 changes: 134 additions & 0 deletions src/ingest_validation_tests/ome_tiff_field_validator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
import json
import re
from pathlib import Path
from multiprocessing import Pool
from os import cpu_count
from typing import List, Optional
from functools import partial
from pprint import pprint

import tifffile
import xmlschema
from ingest_validation_tools.plugin_validator import Validator


def _log(message: str):
print(message)


def expand_terms(dct: dict, prefix: str="") -> dict:
"""
Convert a dict of of XML info as provided by xmlschema to the
form used in the dictionary of expected fields
"""
rslt = {}
expanded_prefix = prefix + "_" if prefix else ""
for key, val in dct.items():
if key.startswith("@"): # terminal element
rslt[expanded_prefix + key[1:]] = val
elif key == "$" and isinstance(val, str): # special case?
rslt[expanded_prefix + key] = val
else:
child_dct = {}
if isinstance(val, list):
assert len(val) == 1, f"Expected only one element in list of dicts"
child_dct.update(expand_terms(val[0], expanded_prefix + key))
elif isinstance(val, dict):
child_dct.update(expand_terms(val, expanded_prefix + key))
elif val is None:
child_dct[expanded_prefix + key] = None
else:
raise ValueError(f"list or dict expected; got {type(val)} {val}")
for key, val in child_dct.items():
rslt[key] = val
return rslt


def check_one_prop(key: str, all_prop_dct: dict, this_test: dict) -> None:
# print(f"CHECK ONE PROP: {key} {all_prop_dct}")
test_type = this_test["dtype"]
if test_type == "trap":
# This test is useful when you want to scan lots of ome-tiff files for an
# example of a new field type
if key in all_prop_dct:
raise RuntimeError(f"TRAP: {key} -> {all_prop_dct[key]} vs {this_test}")
else:
pass
elif test_type == "categorical":
allowed_vals = this_test["allowed_values"]
assert key in all_prop_dct, f"{key} is required but missing"
assert all_prop_dct[key] in allowed_vals, (f"{key} = {all_prop_dct[key]}"
f" not one of {allowed_vals}")
elif test_type == "integer":
assert key in all_prop_dct, f"{key} is required but missing"
assert isinstance(all_prop_dct[key], int), (f"{key} = {all_prop_dct[key]}"
f" is not an int")
elif test_type == "float":
assert key in all_prop_dct, f"{key} is required but missing"
assert isinstance(all_prop_dct[key], float), (f"{key} = {all_prop_dct[key]}"
f" is not a float")
else:
raise NotImplementedError(f"Unimplemented dtype {test_type} for ome-tiff field")


def _check_ome_tiff_file(file: str, /, tests: dict) -> Optional[str]:
try:
with tifffile.TiffFile(file) as tf:
xml_document = xmlschema.XmlDocument(tf.ome_metadata)
image_props = xmlschema.to_dict(xml_document)["Image"]
expanded_props = {}
for term_dct in image_props:
expanded_props.update(expand_terms(term_dct))
# print("EXPANDED PROPS FOLLOWS")
# pprint(expanded_props)
# print("EXPANDED PROPS ABOVE; TESTS FOLLOW")
# pprint(tests)
# print("TESTS ABOVE")
for key in tests:
check_one_prop(key, expanded_props, tests[key])
except Exception as excp:
return f"{file} is not a valid OME.TIFF file: {excp}"


class OmeTiffFieldValidator(Validator):
description = "Recursively test all ome-tiff files for an assay-specific list of fields"
cost = 1.0
version = "1.0"

def collect_errors(self, **kwargs) -> List[Optional[str]]:
cfg_path = Path(__file__).parent / "ome_tiff_fields.json"
cfg_list = json.loads(cfg_path.read_text())
# TODO: need a jsonschema test of cfg_list here
all_tests = {}
for test_set in cfg_list:
if re.fullmatch(test_set["re"], self.assay_type):
all_tests.update(test_set["fields"])

#threads = kwargs.get("coreuse", None) or cpu_count() // 4 or 1
threads = 1
pool = Pool(threads)
_log(f"Threading at OmeTiffFieldValidator with {threads}")
filenames_to_test = []
for glob_expr in [
"**/*.ome.tif",
"**/*.ome.tiff",
"**/*.OME.TIFF",
"**/*.OME.TIF",
]:
for path in self.paths:
for file in path.glob(glob_expr):
filenames_to_test.append(file)

rslt_list: List[Optional[str]] = list(
rslt
for rslt in pool.imap_unordered(partial(_check_ome_tiff_file,
tests = all_tests),
filenames_to_test)
if rslt is not None
)
if rslt_list:
return rslt_list
elif filenames_to_test:
return [None]
else:
return []
56 changes: 56 additions & 0 deletions src/ingest_validation_tests/ome_tiff_fields.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
[
{
"re": "C.*EX",
"fields": {
"Pixels_DimensionOrder": {
"dtype": "categorical",
"allowed_values": ["XYZCT", "XYZTC", "XYCTZ", "XYCZT", "XYTCZ", "XYTZC"]
},
"Pixels_Type": {
"dtype": "categorical",
"allowed_values": ["bit", "complex", "double", "double-complex", "float",
"int8", "int12", "int14", "int16", "int32", "uint8",
"uint12", "uint14", "uint16", "uint32", "Other"]
},
"Pixels_SizeX": {
"dtype": "integer"
},
"Pixels_SizeY": {
"dtype": "integer"
},
"Pixels_SizeZ": {
"dtype": "integer"
},
"Pixels_SizeC": {
"dtype": "integer"
},
"Pixels_SizeT": {
"dtype": "integer"
},
"Pixels_PhysicalSizeX": {
"dtype": "float",
"msg": "dtype should be float"
},
"Pixels_PhysicalSizeXUnit": {
"dtype": "categorical",
"allowed_values": ["µm"]
},
"Pixels_PhysicalSizeY": {
"dtype": "float",
"msg": "dtype should be float"
},
"Pixels_PhysicalSizeYUnit": {
"dtype": "categorical",
"allowed_values": ["µm"]
},
"Pixels_PhysicalSizeZ": {
"dtype": "float",
"msg": "dtype should be float"
},
"Pixels_PhysicalSizeZUnit": {
"dtype": "categorical",
"allowed_values": ["µm"]
}
}
}
]
48 changes: 48 additions & 0 deletions tests/test_ome_tiff_field_validator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
import re
import zipfile
from pathlib import Path

import pytest


@pytest.mark.parametrize(
("test_data_fname", "msg_re_list", "assay_type"),
(
("test_data/codex_tree_ometiff_bad.zip",
[
".*tubhiswt_C0_bad.ome.tif is not a valid OME.TIFF file.*",
".*sample1.ome.tif is not a valid OME.TIFF file.*",
".*sample2.ome.tif is not a valid OME.TIFF file.*",
],
"CODEX"),
("test_data/codex_tree_ometiff_good.zip",
[
".*sample1.ome.tif is not a valid OME.TIFF file.*",
".*sample2.ome.tif is not a valid OME.TIFF file.*",
],
"CODEX"),
("test_data/fake_snrnaseq_tree_good.zip", [], "snRNAseq"),
),
)
def test_ome_tiff_field_validator(test_data_fname, msg_re_list, assay_type, tmp_path):
from ome_tiff_field_validator import OmeTiffFieldValidator

test_data_path = Path(test_data_fname)
zfile = zipfile.ZipFile(test_data_path)
zfile.extractall(tmp_path)
validator = OmeTiffFieldValidator(tmp_path / test_data_path.stem, assay_type)
errors = validator.collect_errors(coreuse=4)[:]
assert len(msg_re_list) == len(errors)
unmatched_errors = []
for err_str in errors:
msg_re_list_dup = list(msg_re_list) # to avoid editing during iteration
match = False
for re_str in msg_re_list_dup:
if ((err_str is None and re_str is None)
or re.match(re_str, err_str, flags=re.MULTILINE)):
msg_re_list.remove(re_str)
match = True
break
if not match:
unmatched_errors.append(err_str)
assert not unmatched_errors, f"Unmatched errors: {unmatched_errors}"

0 comments on commit 7d65022

Please sign in to comment.