Skip to content

Commit

Permalink
Minimal test case demonstrating model stores break on implicit conver…
Browse files Browse the repository at this point in the history
…sions.
  • Loading branch information
jmchilton committed Jan 25, 2024
1 parent 936a35d commit 3a71150
Show file tree
Hide file tree
Showing 4 changed files with 142 additions and 10 deletions.
36 changes: 31 additions & 5 deletions lib/galaxy/model/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4706,12 +4706,14 @@ def get_converted_dataset(self, trans, target_ext, target_context=None, history=
).values()
)
)
return self.attach_implicitly_converted_dataset(trans.sa_session, new_dataset, target_ext)

def attach_implicitly_converted_dataset(self, session, new_dataset, target_ext: str):
new_dataset.name = self.name
self.copy_attributes(new_dataset)
assoc = ImplicitlyConvertedDatasetAssociation(
parent=self, file_type=target_ext, dataset=new_dataset, metadata_safe=False
)
session = trans.sa_session
session.add(new_dataset)
session.add(assoc)
with transaction(session):
Expand Down Expand Up @@ -6048,7 +6050,7 @@ def inheritable(self):
return True # always allow inheriting, used for replacement


class ImplicitlyConvertedDatasetAssociation(Base, RepresentById):
class ImplicitlyConvertedDatasetAssociation(Base, Serializable):
__tablename__ = "implicitly_converted_dataset_association"

id = Column(Integer, primary_key=True)
Expand Down Expand Up @@ -6086,22 +6088,34 @@ class ImplicitlyConvertedDatasetAssociation(Base, RepresentById):
)

def __init__(
self, id=None, parent=None, dataset=None, file_type=None, deleted=False, purged=False, metadata_safe=True
self,
id=None,
parent=None,
dataset=None,
file_type=None,
deleted=False,
purged=False,
metadata_safe=True,
for_import=False,
):
self.id = id
add_object_to_object_session(self, dataset)
if isinstance(dataset, HistoryDatasetAssociation):
self.dataset = dataset
elif isinstance(dataset, LibraryDatasetDatasetAssociation):
self.dataset_ldda = dataset
else:
elif not for_import:
raise AttributeError(f"Unknown dataset type provided for dataset: {type(dataset)}")
# else if for import - these connections might not have been included in the store,
# recover the data we can?
if isinstance(parent, HistoryDatasetAssociation):
self.parent_hda = parent
elif isinstance(parent, LibraryDatasetDatasetAssociation):
self.parent_ldda = parent
else:
elif not for_import:
raise AttributeError(f"Unknown dataset type provided for parent: {type(parent)}")
# else if for import - these connections might not have been included in the store,
# recover the data we can?
self.type = file_type
self.deleted = deleted
self.purged = purged
Expand All @@ -6121,6 +6135,18 @@ def clear(self, purge=False, delete_dataset=True):
except Exception as e:
log.error(f"Failed to purge associated file ({self.get_file_name()}) from disk: {unicodify(e)}")

def _serialize(self, id_encoder, serialization_options):
rval = dict_for(
self,
file_type=self.type,
)
if self.parent_hda:
rval["parent_hda"] = serialization_options.get_identifier(id_encoder, self.parent_hda)
if self.dataset:
rval["hda"] = serialization_options.get_identifier(id_encoder, self.dataset)
serialization_options.attach_identifier(id_encoder, self, rval)
return rval


DEFAULT_COLLECTION_NAME = "Unnamed Collection"

Expand Down
88 changes: 83 additions & 5 deletions lib/galaxy/model/store/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,12 +136,14 @@
ATTRS_FILENAME_LIBRARIES = "libraries_attrs.txt"
ATTRS_FILENAME_LIBRARY_FOLDERS = "library_folders_attrs.txt"
ATTRS_FILENAME_INVOCATIONS = "invocation_attrs.txt"
ATTRS_FILENAME_CONVERSIONS = "implicit_dataset_conversions.txt"
TRACEBACK = "traceback.txt"
GALAXY_EXPORT_VERSION = "2"

DICT_STORE_ATTRS_KEY_HISTORY = "history"
DICT_STORE_ATTRS_KEY_DATASETS = "datasets"
DICT_STORE_ATTRS_KEY_COLLECTIONS = "collections"
DICT_STORE_ATTRS_KEY_CONVERSIONS = "implicit_dataset_conversions"
DICT_STORE_ATTRS_KEY_JOBS = "jobs"
DICT_STORE_ATTRS_KEY_IMPLICIT_COLLECTION_JOBS = "implicit_collection_jobs"
DICT_STORE_ATTRS_KEY_LIBRARIES = "libraries"
Expand Down Expand Up @@ -299,6 +301,10 @@ def invocations_properties(self) -> List[Dict[str, Any]]:
def collections_properties(self) -> List[Dict[str, Any]]:
"""Return a list of HDCA properties."""

@abc.abstractmethod
def implicit_dataset_conversion_properties(self) -> List[Dict[str, Any]]:
"""Return a list of ImplicitlyConvertedDatasetAssociation properties."""

@abc.abstractmethod
def jobs_properties(self) -> List[Dict[str, Any]]:
"""Return a list of jobs properties."""
Expand Down Expand Up @@ -388,6 +394,7 @@ def perform_import(
self._import_collection_instances(object_import_tracker, collections_attrs, history, new_history)
self._import_collection_implicit_input_associations(object_import_tracker, collections_attrs)
self._import_collection_copied_associations(object_import_tracker, collections_attrs)
self._import_implicit_dataset_conversions(object_import_tracker)
self._reassign_hids(object_import_tracker, history)
self._import_jobs(object_import_tracker, history)
self._import_implicit_collection_jobs(object_import_tracker)
Expand Down Expand Up @@ -1004,6 +1011,13 @@ def _reassign_hids(self, object_import_tracker: "ObjectImportTracker", history:
history.stage_addition(obj)
history.add_pending_items()

if object_import_tracker.copy_hid_for:
# in an if to avoid flush if unneeded
for from_dataset, to_dataset in object_import_tracker.copy_hid_for.items():
to_dataset.hid = from_dataset.hid
self._session_add(to_dataset)
self._flush()

def _import_workflow_invocations(
self, object_import_tracker: "ObjectImportTracker", history: Optional[model.History]
) -> None:
Expand Down Expand Up @@ -1245,6 +1259,29 @@ def _import_jobs(self, object_import_tracker: "ObjectImportTracker", history: Op
if object_key in job_attrs:
object_import_tracker.jobs_by_key[job_attrs[object_key]] = imported_job

def _import_implicit_dataset_conversions(self, object_import_tracker: "ObjectImportTracker") -> None:
implicit_dataset_conversion_attrs = self.implicit_dataset_conversion_properties()
for idc_attrs in implicit_dataset_conversion_attrs:
# I don't know what metadata_safe does per se... should we copy this property or
# just set it to False?
metadata_safe = False
idc = model.ImplicitlyConvertedDatasetAssociation(metadata_safe=metadata_safe, for_import=True)
idc.type = idc_attrs["file_type"]
if idc_attrs.get("parent_hda"):
idc.parent_hda = object_import_tracker.hdas_by_key[idc_attrs["parent_hda"]]
if idc_attrs.get("hda"):
idc.dataset = object_import_tracker.hdas_by_key[idc_attrs["hda"]]

# we have a the dataset and the parent, lets ensure they land up with the same HID
if idc.dataset and idc.parent_hda and idc.parent_hda in object_import_tracker.requires_hid:
try:
object_import_tracker.requires_hid.remove(idc.dataset)
except ValueError:
pass # we wanted to remove it anyway.
object_import_tracker.copy_hid_for[idc.parent_hda] = idc.dataset

self._session_add(idc)

def _import_implicit_collection_jobs(self, object_import_tracker: "ObjectImportTracker") -> None:
object_key = self.object_key

Expand Down Expand Up @@ -1306,6 +1343,9 @@ def _copied_from_object_key(
return copied_from_object_key


HasHid = Union[model.HistoryDatasetAssociation, model.HistoryDatasetCollectionAssociation]


class ObjectImportTracker:
"""Keep track of new and existing imported objects.
Expand All @@ -1323,7 +1363,8 @@ class ObjectImportTracker:
hda_copied_from_sinks: Dict[ObjectKeyType, ObjectKeyType]
hdca_copied_from_sinks: Dict[ObjectKeyType, ObjectKeyType]
jobs_by_key: Dict[ObjectKeyType, model.Job]
requires_hid: List[Union[model.HistoryDatasetAssociation, model.HistoryDatasetCollectionAssociation]]
requires_hid: List[HasHid]
copy_hid_for: Dict[HasHid, HasHid]

def __init__(self) -> None:
self.libraries_by_key = {}
Expand All @@ -1341,6 +1382,7 @@ def __init__(self) -> None:
self.implicit_collection_jobs_by_key: Dict[str, ImplicitCollectionJobs] = {}
self.workflows_by_key: Dict[str, model.Workflow] = {}
self.requires_hid = []
self.copy_hid_for = {}

self.new_history: Optional[model.History] = None

Expand Down Expand Up @@ -1424,6 +1466,9 @@ def datasets_properties(
def collections_properties(self) -> List[Dict[str, Any]]:
return self._store_as_dict.get(DICT_STORE_ATTRS_KEY_COLLECTIONS) or []

def implicit_dataset_conversion_properties(self) -> List[Dict[str, Any]]:
return self._store_as_dict.get(DICT_STORE_ATTRS_KEY_CONVERSIONS) or []

def library_properties(
self,
) -> List[Dict[str, Any]]:
Expand Down Expand Up @@ -1500,6 +1545,9 @@ def datasets_properties(self) -> List[Dict[str, Any]]:
def collections_properties(self) -> List[Dict[str, Any]]:
return self._read_list_if_exists(ATTRS_FILENAME_COLLECTIONS)

def implicit_dataset_conversion_properties(self) -> List[Dict[str, Any]]:
return self._read_list_if_exists(ATTRS_FILENAME_CONVERSIONS)

def library_properties(
self,
) -> List[Dict[str, Any]]:
Expand Down Expand Up @@ -1867,6 +1915,7 @@ def __init__(
)
self.export_files = export_files
self.included_datasets: Dict[model.DatasetInstance, Tuple[model.DatasetInstance, bool]] = {}
self.dataset_implicit_conversions: Dict[model.DatasetInstance, model.ImplicitlyConvertedDatasetAssociation] = {}
self.included_collections: List[Union[model.DatasetCollection, model.HistoryDatasetCollectionAssociation]] = []
self.included_libraries: List[model.Library] = []
self.included_library_folders: List[model.LibraryFolder] = []
Expand Down Expand Up @@ -1935,7 +1984,13 @@ def add(src, dest):
if not os.path.exists(dir_path):
os.makedirs(dir_path)

target_filename = get_export_dataset_filename(as_dict["name"], as_dict["extension"], dataset_hid)
conversion = self.dataset_implicit_conversions.get(dataset)
conversion_key = (
self.serialization_options.get_identifier(self.security, conversion) if conversion else None
)
target_filename = get_export_dataset_filename(
as_dict["name"], as_dict["extension"], dataset_hid, conversion_key=conversion_key
)
arcname = os.path.join(dir_name, target_filename)

src = file_name
Expand Down Expand Up @@ -2145,7 +2200,13 @@ def export_history(
if dataset not in self.included_datasets:
if should_include_file:
self._ensure_dataset_file_exists(dataset)
self.add_dataset(dataset, include_files=should_include_file)
if dataset.implicitly_converted_parent_datasets:
# fetching 0th of list but I think this is just a mapping quirk - I can't imagine how there
# would be more than one of these -John
conversion = dataset.implicitly_converted_parent_datasets[0]
self.add_implicit_conversion_dataset(dataset, should_include_file, conversion)
else:
self.add_dataset(dataset, include_files=should_include_file)

def export_library(
self, library: model.Library, include_hidden: bool = False, include_deleted: bool = False
Expand Down Expand Up @@ -2224,6 +2285,15 @@ def add_dataset_collection(
self.collections_attrs.append(collection)
self.included_collections.append(collection)

def add_implicit_conversion_dataset(
self,
dataset: model.DatasetInstance,
include_files: bool,
conversion: model.ImplicitlyConvertedDatasetAssociation,
) -> None:
self.included_datasets[dataset] = (dataset, include_files)
self.dataset_implicit_conversions[dataset] = conversion

def add_dataset(self, dataset: model.DatasetInstance, include_files: bool = True) -> None:
self.included_datasets[dataset] = (dataset, include_files)

Expand Down Expand Up @@ -2270,6 +2340,10 @@ def to_json(attributes):
with open(collections_attrs_filename, "w") as collections_attrs_out:
collections_attrs_out.write(to_json(self.collections_attrs))

conversions_attrs_filename = os.path.join(export_directory, ATTRS_FILENAME_CONVERSIONS)
with open(conversions_attrs_filename, "w") as conversions_attrs_out:
conversions_attrs_out.write(to_json(self.dataset_implicit_conversions.values()))

jobs_attrs = []
for job_id, job_output_dataset_associations in self.job_output_dataset_associations.items():
output_dataset_mapping: Dict[str, List[Union[str, int]]] = {}
Expand Down Expand Up @@ -2392,6 +2466,7 @@ class WriteCrates:
included_invocations: List[model.WorkflowInvocation]
export_directory: StrPath
included_datasets: Dict[model.DatasetInstance, Tuple[model.DatasetInstance, bool]]
dataset_implicit_conversions: Dict[model.DatasetInstance, model.ImplicitlyConvertedDatasetAssociation]
dataset_id_to_path: Dict[int, Tuple[Optional[str], Optional[str]]]

@property
Expand Down Expand Up @@ -2896,12 +2971,15 @@ def tar_export_directory(export_directory: StrPath, out_file: StrPath, gzip: boo
store_archive.add(os.path.join(export_directory, export_path), arcname=export_path)


def get_export_dataset_filename(name: str, ext: str, hid: int) -> str:
def get_export_dataset_filename(name: str, ext: str, hid: int, conversion_key: Optional[str]) -> str:
"""
Builds a filename for a dataset using its name an extension.
"""
base = "".join(c in FILENAME_VALID_CHARS and c or "_" for c in name)
return f"{base}_{hid}.{ext}"
if not conversion_key:
return f"{base}_{hid}.{ext}"
else:
return f"{base}_{hid}_conversion_{conversion_key}.{ext}"


def imported_store_for_metadata(
Expand Down
1 change: 1 addition & 0 deletions test/unit/data/model/2.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
converted bed
27 changes: 27 additions & 0 deletions test/unit/data/model/test_model_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
TESTCASE_DIRECTORY = pathlib.Path(__file__).parent
TEST_PATH_1 = TESTCASE_DIRECTORY / "1.txt"
TEST_PATH_2 = TESTCASE_DIRECTORY / "2.bed"
TEST_PATH_2_CONVERTED = TESTCASE_DIRECTORY / "2.txt"
DEFAULT_OBJECT_STORE_BY = "id"


Expand Down Expand Up @@ -120,6 +121,32 @@ def test_import_export_history_allow_discarded_data():
assert imported_job.output_datasets[0].dataset == datasets[1]


def test_import_export_history_with_implicit_conversion():
app = _mock_app()

u, h, d1, d2, j = _setup_simple_cat_job(app)

convert_ext = "fasta"
implicit_hda = model.HistoryDatasetAssociation(extension=convert_ext, create_dataset=True, flush=False, history=h)
implicit_hda.hid = d2.hid
# this adds and flushes the result...
d2.attach_implicitly_converted_dataset(app.model.context, implicit_hda, convert_ext)
app.object_store.update_from_file(implicit_hda.dataset, file_name=TEST_PATH_2_CONVERTED, create=True)

assert len(h.active_datasets) == 3
imported_history = _import_export_history(app, h, export_files="copy", include_hidden=True)

assert len(imported_history.active_datasets) == 3
recovered_hda_2 = imported_history.active_datasets[1]
assert recovered_hda_2.implicitly_converted_datasets
imported_conversion = recovered_hda_2.implicitly_converted_datasets[0]
assert imported_conversion.type == "fasta"
assert imported_conversion.dataset == imported_history.active_datasets[2]

# implicit conversions have the same HID... ensure this property is recovered...
assert imported_history.active_datasets[2].hid == imported_history.active_datasets[1].hid


def test_import_export_bag_archive():
"""Test a simple job import/export using a BagIt archive."""
dest_parent = mkdtemp()
Expand Down

0 comments on commit 3a71150

Please sign in to comment.