Skip to content

Commit

Permalink
Allow dataset and source checksum validation during materialization.
Browse files Browse the repository at this point in the history
  • Loading branch information
jmchilton committed Sep 26, 2024
1 parent c2b3a27 commit a6e58e0
Show file tree
Hide file tree
Showing 13 changed files with 200 additions and 25 deletions.
4 changes: 3 additions & 1 deletion lib/galaxy/managers/hdas.py
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,9 @@ def materialize(self, request: MaterializeDatasetInstanceTaskRequest, in_place:
else:
dataset_instance = self.ldda_manager.get_accessible(request.content, user)
history = self.app.history_manager.by_id(request.history_id)
new_hda = materializer.ensure_materialized(dataset_instance, target_history=history, in_place=in_place)
new_hda = materializer.ensure_materialized(
dataset_instance, target_history=history, validate_hashes=request.validate_hashes, in_place=in_place
)
if not in_place:
history.add_dataset(new_hda, set_hid=True)
session = self.session()
Expand Down
24 changes: 22 additions & 2 deletions lib/galaxy/model/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,7 @@
WorkflowMappingField,
)
from galaxy.util.hash_util import (
HashFunctionNameEnum,
md5_hash_str,
new_insecure_hash,
)
Expand Down Expand Up @@ -4493,7 +4494,17 @@ def copy(self) -> "DatasetSource":
return new_source


class DatasetSourceHash(Base, Serializable):
class HasHashFunctionName:
hash_function: Mapped[Optional[str]]

@property
def hash_func_name(self) -> HashFunctionNameEnum:
as_str = self.hash_function
assert as_str
return HashFunctionNameEnum(self.hash_function)


class DatasetSourceHash(Base, Serializable, HasHashFunctionName):
__tablename__ = "dataset_source_hash"

id: Mapped[int] = mapped_column(primary_key=True)
Expand All @@ -4518,7 +4529,7 @@ def copy(self) -> "DatasetSourceHash":
return new_hash


class DatasetHash(Base, Dictifiable, Serializable):
class DatasetHash(Base, Dictifiable, Serializable, HasHashFunctionName):
__tablename__ = "dataset_hash"

id: Mapped[int] = mapped_column(primary_key=True)
Expand Down Expand Up @@ -4547,6 +4558,15 @@ def copy(self) -> "DatasetHash":
new_hash.extra_files_path = self.extra_files_path
return new_hash

@property
def hash_func_name(self) -> HashFunctionNameEnum:
as_str = self.hash_function
assert as_str
return HashFunctionNameEnum(self.hash_function)


DescribesHash = Union[DatasetSourceHash, DatasetHash]


def datatype_for_extension(extension, datatypes_registry=None) -> "Data":
if extension is not None:
Expand Down
70 changes: 57 additions & 13 deletions lib/galaxy/model/deferred.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import shutil
from typing import (
cast,
List,
NamedTuple,
Optional,
Union,
Expand All @@ -25,7 +26,9 @@
Dataset,
DatasetCollection,
DatasetCollectionElement,
DatasetHash,
DatasetSource,
DescribesHash,
History,
HistoryDatasetAssociation,
HistoryDatasetCollectionAssociation,
Expand All @@ -36,6 +39,7 @@
ObjectStore,
ObjectStorePopulator,
)
from galaxy.util.hash_util import verify_hash

log = logging.getLogger(__name__)

Expand Down Expand Up @@ -95,6 +99,7 @@ def ensure_materialized(
self,
dataset_instance: Union[HistoryDatasetAssociation, LibraryDatasetDatasetAssociation],
target_history: Optional[History] = None,
validate_hashes: bool = False,
in_place: bool = False,
) -> HistoryDatasetAssociation:
"""Create a new detached dataset instance from the supplied instance.
Expand All @@ -107,15 +112,21 @@ def ensure_materialized(
if dataset.state != Dataset.states.DEFERRED and isinstance(dataset_instance, HistoryDatasetAssociation):
return dataset_instance

materialized_dataset = Dataset()
materialized_dataset.state = Dataset.states.OK
materialized_dataset.deleted = False
materialized_dataset.purged = False
materialized_dataset.sources = [s.copy() for s in dataset.sources]
materialized_dataset.hashes = [h.copy() for h in dataset.hashes]

materialized_dataset_hashes = [h.copy() for h in dataset.hashes]
if in_place:
materialized_dataset = dataset_instance.dataset
materialized_dataset.state = Dataset.states.OK
else:
materialized_dataset = Dataset()
materialized_dataset.state = Dataset.states.OK
materialized_dataset.deleted = False
materialized_dataset.purged = False
materialized_dataset.sources = [s.copy() for s in dataset.sources]
materialized_dataset.hashes = materialized_dataset_hashes
target_source = self._find_closest_dataset_source(dataset)
transient_paths = None

exception_materializing: Optional[Exception] = None
if attached:
object_store_populator = self._object_store_populator
assert object_store_populator
Expand All @@ -131,17 +142,28 @@ def ensure_materialized(
with transaction(sa_session):
sa_session.commit()
object_store_populator.set_dataset_object_store_id(materialized_dataset)
path = self._stream_source(target_source, datatype=dataset_instance.datatype)
object_store.update_from_file(materialized_dataset, file_name=path)
try:
path = self._stream_source(
target_source, dataset_instance.datatype, validate_hashes, materialized_dataset_hashes
)
object_store.update_from_file(materialized_dataset, file_name=path)
materialized_dataset.set_size()
except Exception as e:
exception_materializing = e
else:
transient_path_mapper = self._transient_path_mapper
assert transient_path_mapper
transient_paths = transient_path_mapper.transient_paths_for(dataset)
# TODO: optimize this by streaming right to this path...
# TODO: take into acount transform and ensure we are and are not modifying the file as appropriate.
path = self._stream_source(target_source, datatype=dataset_instance.datatype)
shutil.move(path, transient_paths.external_filename)
materialized_dataset.external_filename = transient_paths.external_filename
try:
path = self._stream_source(
target_source, dataset_instance.datatype, validate_hashes, materialized_dataset_hashes
)
shutil.move(path, transient_paths.external_filename)
materialized_dataset.external_filename = transient_paths.external_filename
except Exception as e:
exception_materializing = e

history = target_history
if history is None and isinstance(dataset_instance, HistoryDatasetAssociation):
Expand All @@ -159,6 +181,11 @@ def ensure_materialized(
else:
assert isinstance(dataset_instance, HistoryDatasetAssociation)
materialized_dataset_instance = cast(HistoryDatasetAssociation, dataset_instance)
if exception_materializing is not None:
materialized_dataset.state = Dataset.states.ERROR
materialized_dataset_instance.info = (
f"Failed to materialize deferred dataset with exception: {exception_materializing}"
)
if attached:
sa_session = self._sa_session
if sa_session is None:
Expand All @@ -184,11 +211,17 @@ def ensure_materialized(
materialized_dataset_instance.metadata_deferred = False
return materialized_dataset_instance

def _stream_source(self, target_source: DatasetSource, datatype) -> str:
def _stream_source(
self, target_source: DatasetSource, datatype, validate_hashes: bool, dataset_hashes: List[DatasetHash]
) -> str:
source_uri = target_source.source_uri
if source_uri is None:
raise Exception("Cannot stream from dataset source without specified source_uri")
path = stream_url_to_file(source_uri, file_sources=self._file_sources)
if validate_hashes and target_source.hashes:
for source_hash in target_source.hashes:
_validate_hash(path, source_hash, "downloaded file")

transform = target_source.transform or []
to_posix_lines = False
spaces_to_tabs = False
Expand All @@ -210,6 +243,11 @@ def _stream_source(self, target_source: DatasetSource, datatype) -> str:
path = convert_result.converted_path
if datatype_groom:
datatype.groom_dataset_content(path)

if validate_hashes and dataset_hashes:
for dataset_hash in dataset_hashes:
_validate_hash(path, dataset_hash, "dataset contents")

return path

def _find_closest_dataset_source(self, dataset: Dataset) -> DatasetSource:
Expand Down Expand Up @@ -306,3 +344,9 @@ def materializer_factory(
file_sources=file_sources,
sa_session=sa_session,
)


def _validate_hash(path: str, describes_hash: DescribesHash, what: str) -> None:
hash_value = describes_hash.hash_value
if hash_value is not None:
verify_hash(path, hash_func_name=describes_hash.hash_func_name, hash_value=hash_value)
2 changes: 1 addition & 1 deletion lib/galaxy/model/unittest_utils/store_fixtures.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
TEST_SOURCE_URI = "https://raw.githubusercontent.com/galaxyproject/galaxy/dev/test-data/2.bed"
TEST_SOURCE_URI_BAM = "https://raw.githubusercontent.com/galaxyproject/galaxy/dev/test-data/1.bam"
TEST_HASH_FUNCTION = "MD5"
TEST_HASH_VALUE = "moocowpretendthisisahas"
TEST_HASH_VALUE = "f568c29421792b1b1df4474dafae01f1"
TEST_HISTORY_NAME = "My history in a model store"
TEST_EXTENSION = "bed"
TEST_LIBRARY_NAME = "My cool library"
Expand Down
10 changes: 9 additions & 1 deletion lib/galaxy/schema/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -3681,7 +3681,15 @@ class PageSummaryBase(Model):
)


class MaterializeDatasetInstanceAPIRequest(Model):
class MaterializeDatasetOptions(Model):
validate_hashes: bool = Field(
False,
title="Validate hashes",
description="Set to true to enable dataset validation during materialization.",
)


class MaterializeDatasetInstanceAPIRequest(MaterializeDatasetOptions):
source: DatasetSourceType = Field(
title="Source",
description="The source of the content. Can be other history element to be copied or library elements.",
Expand Down
5 changes: 5 additions & 0 deletions lib/galaxy/schema/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,11 @@ class MaterializeDatasetInstanceTaskRequest(Model):
"- The decoded id of the HDA\n"
),
)
validate_hashes: bool = Field(
False,
title="Validate hashes",
description="Set to true to enable dataset validation during materialization.",
)


class ComputeDatasetHashTaskRequest(Model):
Expand Down
8 changes: 2 additions & 6 deletions lib/galaxy/tools/data_fetch.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
from galaxy.util.compression_utils import CompressedFile
from galaxy.util.hash_util import (
HASH_NAMES,
memory_bound_hexdigest,
verify_hash,
)

DESCRIPTION = """Data Import Script"""
Expand Down Expand Up @@ -515,11 +515,7 @@ def _has_src_to_path(upload_config, item, is_dataset=False) -> Tuple[str, str]:

def _handle_hash_validation(upload_config, hash_function, hash_value, path):
if upload_config.validate_hashes:
calculated_hash_value = memory_bound_hexdigest(hash_func_name=hash_function, path=path)
if calculated_hash_value != hash_value:
raise Exception(
f"Failed to validate upload with [{hash_function}] - expected [{hash_value}] got [{calculated_hash_value}]"
)
verify_hash(path, hash_func_name=hash_function, hash_value=hash_value, what="upload")


def _arg_parser():
Expand Down
8 changes: 8 additions & 0 deletions lib/galaxy/util/hash_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,14 @@ def parse_checksum_hash(checksum: str) -> Tuple[HashFunctionNameEnum, str]:
return HashFunctionNameEnum(hash_name), hash_value


def verify_hash(path: str, hash_func_name: HashFunctionNameEnum, hash_value: str, what: str = "path"):
calculated_hash_value = memory_bound_hexdigest(hash_func_name=hash_func_name, path=path)
if calculated_hash_value != hash_value:
raise Exception(
f"Failed to validate {what} with [{hash_func_name}] - expected [{hash_value}] got [{calculated_hash_value}]"
)


__all__ = (
"md5",
"hashlib",
Expand Down
4 changes: 4 additions & 0 deletions lib/galaxy/webapps/galaxy/api/history_contents.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
HistoryContentType,
MaterializeDatasetInstanceAPIRequest,
MaterializeDatasetInstanceRequest,
MaterializeDatasetOptions,
StoreExportPayload,
UpdateHistoryContentsBatchPayload,
UpdateHistoryContentsPayload,
Expand Down Expand Up @@ -1072,12 +1073,15 @@ def materialize_dataset(
history_id: HistoryIDPathParam,
id: HistoryItemIDPathParam,
trans: ProvidesHistoryContext = DependsOnTrans,
materialize_api_payload: Optional[MaterializeDatasetOptions] = Body(None),
) -> AsyncTaskResultSummary:
validate_hashes: bool = materialize_api_payload.validate_hashes if materialize_api_payload is not None else False
# values are already validated, use model_construct
materialize_request = MaterializeDatasetInstanceRequest.model_construct(
history_id=history_id,
source=DatasetSourceType.hda,
content=id,
validate_hashes=validate_hashes,
)
rval = self.service.materialize(trans, materialize_request)
return rval
Expand Down
1 change: 1 addition & 0 deletions lib/galaxy/webapps/galaxy/services/history_contents.py
Original file line number Diff line number Diff line change
Expand Up @@ -573,6 +573,7 @@ def materialize(
history_id=request.history_id,
source=request.source,
content=request.content,
validate_hashes=request.validate_hashes,
user=trans.async_request_user,
)
results = materialize_task.delay(request=task_request)
Expand Down
6 changes: 5 additions & 1 deletion lib/galaxy_test/base/populators.py
Original file line number Diff line number Diff line change
Expand Up @@ -981,7 +981,9 @@ def tools_post(self, payload: dict, url="tools") -> Response:
tool_response = self._post(url, data=payload)
return tool_response

def materialize_dataset_instance(self, history_id: str, id: str, source: str = "hda"):
def materialize_dataset_instance(
self, history_id: str, id: str, source: str = "hda", validate_hashes: bool = False
):
payload: Dict[str, Any]
if source == "ldda":
url = f"histories/{history_id}/materialize"
Expand All @@ -992,6 +994,8 @@ def materialize_dataset_instance(self, history_id: str, id: str, source: str = "
else:
url = f"histories/{history_id}/contents/datasets/{id}/materialize"
payload = {}
if validate_hashes:
payload["validate_hashes"] = True
create_response = self._post(url, payload, json=True)
api_asserts.assert_status_code_is_ok(create_response)
create_response_json = create_response.json()
Expand Down
20 changes: 20 additions & 0 deletions test/integration/test_materialize_dataset_instance_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,26 @@ def test_materialize_gxfiles_uri(self, history_id: str):
assert new_hda_details["state"] == "ok"
assert not new_hda_details["deleted"]

@pytest.mark.require_new_history
def test_materialize_hash_failure(self, history_id: str):
store_dict = deferred_hda_model_store_dict(source_uri="gxfiles://testdatafiles/2.bed")
store_dict["datasets"][0]["file_metadata"]["hashes"][0]["hash_value"] = "invalidhash"
as_list = self.dataset_populator.create_contents_from_store(history_id, store_dict=store_dict)
assert len(as_list) == 1
deferred_hda = as_list[0]
assert deferred_hda["model_class"] == "HistoryDatasetAssociation"
assert deferred_hda["state"] == "deferred"
assert not deferred_hda["deleted"]

self.dataset_populator.materialize_dataset_instance(history_id, deferred_hda["id"], validate_hashes=True)
self.dataset_populator.wait_on_history_length(history_id, 2)
new_hda_details = self.dataset_populator.get_history_dataset_details(
history_id, hid=2, assert_ok=False, wait=False
)
assert new_hda_details["model_class"] == "HistoryDatasetAssociation"
assert new_hda_details["state"] == "error"
assert not new_hda_details["deleted"]

@pytest.mark.require_new_history
def test_materialize_history_dataset_bam(self, history_id: str):
as_list = self.dataset_populator.create_contents_from_store(
Expand Down
Loading

0 comments on commit a6e58e0

Please sign in to comment.