Skip to content

Commit

Permalink
Allow workflows to consumer deferred {src: url} dicts..
Browse files Browse the repository at this point in the history
  • Loading branch information
jmchilton committed Sep 10, 2024
1 parent 76ca391 commit 339bdfd
Show file tree
Hide file tree
Showing 11 changed files with 175 additions and 24 deletions.
13 changes: 13 additions & 0 deletions lib/galaxy/config/schemas/config_schema.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3738,6 +3738,19 @@ mapping:
Optional configuration file similar to `job_config_file` to specify
which Galaxy processes should schedule workflows.
workflow_scheduling_separate_materialization_iteration:
type: bool
default: true
required: false
desc: |
Workflows launched with URI/URL inputs that are not marked as 'deferred'
are "materialized" (or undeferred) by the workflow scheduler. This might be
a lengthy process. Setting this to 'True' will place the invocation back in
the queue after materialization before scheduling the workflow so it is less
likely to starve other workflow scheduling. Ideally, Galaxy would allow more
fine grain control of handlers but until then, this provides a way to tip the
balance between "doing more work" and "being more fair".
cache_user_job_count:
type: bool
default: false
Expand Down
8 changes: 7 additions & 1 deletion lib/galaxy/jobs/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,13 @@ def setup_query(self):
if self.grab_model is model.Job:
grab_condition = self.grab_model.state == self.grab_model.states.NEW
elif self.grab_model is model.WorkflowInvocation:
grab_condition = self.grab_model.state.in_((self.grab_model.states.NEW, self.grab_model.states.CANCELLING))
grab_condition = self.grab_model.state.in_(
(
self.grab_model.states.NEW,
self.grab_model.states.REQUIRES_MATERIALIZATION,
self.grab_model.states.CANCELLING,
)
)
else:
raise NotImplementedError(f"Grabbing {self.grab_model.__name__} not implemented")
subq = (
Expand Down
15 changes: 15 additions & 0 deletions lib/galaxy/managers/hdas.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,13 +44,15 @@
taggable,
users,
)
from galaxy.managers.context import ProvidesHistoryContext
from galaxy.model import (
Job,
JobStateHistory,
JobToOutputDatasetAssociation,
)
from galaxy.model.base import transaction
from galaxy.model.deferred import materializer_factory
from galaxy.model.dereference import dereference_to_model
from galaxy.schema.schema import DatasetSourceType
from galaxy.schema.storage_cleaner import (
CleanableItemsSummary,
Expand All @@ -68,6 +70,7 @@
MinimalManagerApp,
StructuredApp,
)
from galaxy.tool_util.parameters import DataRequestUri
from galaxy.util.compression_utils import get_fileobj

log = logging.getLogger(__name__)
Expand Down Expand Up @@ -343,6 +346,18 @@ def _set_permissions(self, trans, hda, role_ids_dict):
raise exceptions.RequestParameterInvalidException(error)


def dereference_input(
trans: ProvidesHistoryContext, data_request: DataRequestUri, history: Optional[model.History] = None
) -> model.HistoryDatasetAssociation:
target_history = history or trans.history
hda = dereference_to_model(trans.sa_session, trans.user, target_history, data_request)
permissions = trans.app.security_agent.history_get_default_permissions(target_history)
trans.app.security_agent.set_all_dataset_permissions(hda.dataset, permissions, new=True, flush=False)
with transaction(trans.sa_session):
trans.sa_session.commit()
return hda


class HDAStorageCleanerManager(base.StorageCleanerManager):
def __init__(self, hda_manager: HDAManager, dataset_manager: datasets.DatasetManager):
self.hda_manager = hda_manager
Expand Down
12 changes: 5 additions & 7 deletions lib/galaxy/managers/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,10 @@
ProvidesUserContext,
)
from galaxy.managers.datasets import DatasetManager
from galaxy.managers.hdas import HDAManager
from galaxy.managers.hdas import (
dereference_input,
HDAManager,
)
from galaxy.managers.histories import HistoryManager
from galaxy.managers.lddas import LDDAManager
from galaxy.managers.users import UserManager
Expand All @@ -67,7 +70,6 @@
YIELD_PER_ROWS,
)
from galaxy.model.base import transaction
from galaxy.model.dereference import dereference_to_model
from galaxy.model.index_filter_util import (
raw_text_column_filter,
text_column_filter,
Expand Down Expand Up @@ -1219,12 +1221,8 @@ def dereference(

def dereference_callback(data_request: DataRequestUri) -> DataRequestInternalHda:
# a deferred dataset corresponding to request
hda = dereference_to_model(trans.sa_session, trans.user, trans.history, data_request)
hda = dereference_input(trans, data_request)
new_hdas.append(DereferencedDatasetPair(hda, data_request))
permissions = trans.app.security_agent.history_get_default_permissions(trans.history)
trans.app.security_agent.set_all_dataset_permissions(hda.dataset, permissions, new=True, flush=False)
with transaction(trans.sa_session):
trans.sa_session.commit()
return DataRequestInternalHda(id=hda.id)

tool_state = RequestInternalToolState(tool_request.request)
Expand Down
28 changes: 27 additions & 1 deletion lib/galaxy/model/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import string
from collections import defaultdict
from collections.abc import Callable
from dataclasses import dataclass
from datetime import (
datetime,
timedelta,
Expand Down Expand Up @@ -8547,6 +8548,12 @@ class StoredWorkflowMenuEntry(Base, RepresentById):
)


@dataclass
class InputWithRequest:
input: Any
request: Dict[str, Any]


class WorkflowInvocation(Base, UsesCreateAndUpdateTime, Dictifiable, Serializable):
__tablename__ = "workflow_invocation"

Expand Down Expand Up @@ -8778,6 +8785,7 @@ def poll_active_workflow_ids(engine, scheduler=None, handler=None):
and_conditions = [
or_(
WorkflowInvocation.state == WorkflowInvocation.states.NEW,
WorkflowInvocation.state == WorkflowInvocation.states.REQUIRES_MATERIALIZATION,
WorkflowInvocation.state == WorkflowInvocation.states.READY,
WorkflowInvocation.state == WorkflowInvocation.states.CANCELLING,
),
Expand Down Expand Up @@ -8869,6 +8877,14 @@ def input_associations(self):
inputs.append(input_dataset_collection_assoc)
return inputs

def inputs_requiring_materialization(self):
hdas_to_materialize = []
for input_dataset_assoc in self.input_datasets:
request = input_dataset_assoc.request
if request and not request.get("deferred", False):
hdas_to_materialize.append(input_dataset_assoc.dataset)
return hdas_to_materialize

def _serialize(self, id_encoder, serialization_options):
invocation_attrs = dict_for(self)
invocation_attrs["state"] = self.state
Expand Down Expand Up @@ -9031,20 +9047,28 @@ def attach_step(request_to_content):
else:
request_to_content.workflow_step = step

request: Optional[Dict[str, Any]] = None
if isinstance(content, InputWithRequest):
request = content.request
content = content.input

history_content_type = getattr(content, "history_content_type", None)
if history_content_type == "dataset":
request_to_content = WorkflowRequestToInputDatasetAssociation()
request_to_content.dataset = content
request_to_content.request = request
attach_step(request_to_content)
self.input_datasets.append(request_to_content)
elif history_content_type == "dataset_collection":
request_to_content = WorkflowRequestToInputDatasetCollectionAssociation()
request_to_content.dataset_collection = content
request_to_content.request = request
attach_step(request_to_content)
self.input_dataset_collections.append(request_to_content)
else:
request_to_content = WorkflowRequestInputStepParameter()
request_to_content.parameter_value = content
request_to_content.request = request
attach_step(request_to_content)
self.input_step_parameters.append(request_to_content)

Expand Down Expand Up @@ -9468,6 +9492,7 @@ class WorkflowRequestToInputDatasetAssociation(Base, Dictifiable, Serializable):
workflow_invocation_id: Mapped[Optional[int]] = mapped_column(ForeignKey("workflow_invocation.id"), index=True)
workflow_step_id: Mapped[Optional[int]] = mapped_column(ForeignKey("workflow_step.id"))
dataset_id: Mapped[Optional[int]] = mapped_column(ForeignKey("history_dataset_association.id"), index=True)
request: Mapped[Optional[Dict]] = mapped_column(JSONType)

workflow_step: Mapped[Optional["WorkflowStep"]] = relationship()
dataset: Mapped[Optional["HistoryDatasetAssociation"]] = relationship()
Expand Down Expand Up @@ -9503,6 +9528,7 @@ class WorkflowRequestToInputDatasetCollectionAssociation(Base, Dictifiable, Seri
workflow_invocation: Mapped[Optional["WorkflowInvocation"]] = relationship(
back_populates="input_dataset_collections"
)
request: Mapped[Optional[Dict]] = mapped_column(JSONType)

history_content_type = "dataset_collection"
dict_collection_visible_keys = ["id", "workflow_invocation_id", "workflow_step_id", "dataset_collection_id", "name"]
Expand All @@ -9526,6 +9552,7 @@ class WorkflowRequestInputStepParameter(Base, Dictifiable, Serializable):
workflow_invocation_id: Mapped[Optional[int]] = mapped_column(ForeignKey("workflow_invocation.id"), index=True)
workflow_step_id: Mapped[Optional[int]] = mapped_column(ForeignKey("workflow_step.id"))
parameter_value: Mapped[Optional[bytes]] = mapped_column(MutableJSONType)
request: Mapped[Optional[Dict]] = mapped_column(JSONType)

workflow_step: Mapped[Optional["WorkflowStep"]] = relationship()
workflow_invocation: Mapped[Optional["WorkflowInvocation"]] = relationship(back_populates="input_step_parameters")
Expand All @@ -9549,7 +9576,6 @@ class WorkflowInvocationOutputDatasetAssociation(Base, Dictifiable, Serializable
workflow_step_id: Mapped[Optional[int]] = mapped_column(ForeignKey("workflow_step.id"), index=True)
dataset_id: Mapped[Optional[int]] = mapped_column(ForeignKey("history_dataset_association.id"), index=True)
workflow_output_id: Mapped[Optional[int]] = mapped_column(ForeignKey("workflow_output.id"), index=True)

workflow_invocation: Mapped[Optional["WorkflowInvocation"]] = relationship(back_populates="output_datasets")
workflow_step: Mapped[Optional["WorkflowStep"]] = relationship()
dataset: Mapped[Optional["HistoryDatasetAssociation"]] = relationship()
Expand Down
1 change: 1 addition & 0 deletions lib/galaxy/schema/invocation.py
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,7 @@ class InvocationMessageResponseModel(RootModel):

class InvocationState(str, Enum):
NEW = "new" # Brand new workflow invocation... maybe this should be same as READY
REQUIRES_MATERIALIZATION = "requires_materialization" # an otherwise NEW or READY workflow that requires inputs to be materialized (undeferred)
READY = "ready" # Workflow ready for another iteration of scheduling.
SCHEDULED = "scheduled" # Workflow has been scheduled.
CANCELLED = "cancelled"
Expand Down
7 changes: 6 additions & 1 deletion lib/galaxy/workflow/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,12 @@ def queue_invoke(
)
workflow_invocation = workflow_run_config_to_request(trans, workflow_run_config, workflow)
workflow_invocation.workflow = workflow
return trans.app.workflow_scheduling_manager.queue(workflow_invocation, request_params, flush=flush)
initial_state = model.WorkflowInvocation.states.NEW
if workflow_run_config.requires_materialization:
initial_state = model.WorkflowInvocation.states.REQUIRES_MATERIALIZATION
return trans.app.workflow_scheduling_manager.queue(
workflow_invocation, request_params, flush=flush, initial_state=initial_state
)


class WorkflowInvoker:
Expand Down
39 changes: 33 additions & 6 deletions lib/galaxy/workflow/run_request.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,13 @@
)

from galaxy import exceptions
from galaxy.managers.hdas import dereference_input
from galaxy.model import (
EffectiveOutput,
History,
HistoryDatasetAssociation,
HistoryDatasetCollectionAssociation,
InputWithRequest,
LibraryDataset,
LibraryDatasetDatasetAssociation,
WorkflowInvocation,
Expand All @@ -25,6 +27,7 @@
ensure_object_added_to_session,
transaction,
)
from galaxy.tool_util.parameters import DataRequestUri
from galaxy.tools.parameters.meta import expand_workflow_inputs
from galaxy.workflow.resources import get_resource_mapper_function

Expand Down Expand Up @@ -57,6 +60,9 @@ class WorkflowRunConfig:
:param inputs: Map from step ids to dict's containing HDA for these steps.
:type inputs: dict
:param requires_materialization: True if an input requires materialization before
the workflow is scheduled.
:param inputs_by: How inputs maps to inputs (datasets/collections) to workflows
steps - by unencoded database id ('step_id'), index in workflow
'step_index' (independent of database), or by input name for
Expand All @@ -78,6 +84,7 @@ def __init__(
copy_inputs_to_history: bool = False,
use_cached_job: bool = False,
resource_params: Optional[Dict[int, Any]] = None,
requires_materialization: bool = False,
preferred_object_store_id: Optional[str] = None,
preferred_outputs_object_store_id: Optional[str] = None,
preferred_intermediate_object_store_id: Optional[str] = None,
Expand All @@ -91,6 +98,7 @@ def __init__(
self.resource_params = resource_params or {}
self.allow_tool_state_corrections = allow_tool_state_corrections
self.use_cached_job = use_cached_job
self.requires_materialization = requires_materialization
self.preferred_object_store_id = preferred_object_store_id
self.preferred_outputs_object_store_id = preferred_outputs_object_store_id
self.preferred_intermediate_object_store_id = preferred_intermediate_object_store_id
Expand Down Expand Up @@ -310,7 +318,7 @@ def build_workflow_run_configs(
legacy = payload.get("legacy", False)
already_normalized = payload.get("parameters_normalized", False)
raw_parameters = payload.get("parameters", {})

requires_materialization: bool = False
run_configs = []
unexpanded_param_map = _normalize_step_parameters(
workflow.steps, raw_parameters, legacy=legacy, already_normalized=already_normalized
Expand Down Expand Up @@ -368,23 +376,30 @@ def build_workflow_run_configs(
raise exceptions.RequestParameterInvalidException(
f"Not input source type defined for input '{input_dict}'."
)
if "id" not in input_dict:
raise exceptions.RequestParameterInvalidException(f"Not input id defined for input '{input_dict}'.")
input_source = input_dict["src"]
if "id" not in input_dict and input_source != "url":
raise exceptions.RequestParameterInvalidException(f"No input id defined for input '{input_dict}'.")
elif input_source == "url" and not input_dict.get("url"):
raise exceptions.RequestParameterInvalidException(
f"Supplied 'url' is empty or absent for input '{input_dict}'."
)
if "content" in input_dict:
raise exceptions.RequestParameterInvalidException(
f"Input cannot specify explicit 'content' attribute {input_dict}'."
)
input_source = input_dict["src"]
input_id = input_dict["id"]
input_id = input_dict.get("id")
try:
added_to_history = False
if input_source == "ldda":
assert input_id
ldda = trans.sa_session.get(LibraryDatasetDatasetAssociation, trans.security.decode_id(input_id))
assert ldda
assert trans.user_is_admin or trans.app.security_agent.can_access_dataset(
trans.get_current_user_roles(), ldda.dataset
)
content = ldda.to_history_dataset_association(history, add_to_history=add_to_history)
elif input_source == "ld":
assert input_id
library_dataset = trans.sa_session.get(LibraryDataset, trans.security.decode_id(input_id))
assert library_dataset
ldda = library_dataset.library_dataset_dataset_association
Expand All @@ -394,18 +409,29 @@ def build_workflow_run_configs(
)
content = ldda.to_history_dataset_association(history, add_to_history=add_to_history)
elif input_source == "hda":
assert input_id
# Get dataset handle, add to dict and history if necessary
content = trans.sa_session.get(HistoryDatasetAssociation, trans.security.decode_id(input_id))
assert trans.user_is_admin or trans.app.security_agent.can_access_dataset(
trans.get_current_user_roles(), content.dataset
)
elif input_source == "hdca":
content = app.dataset_collection_manager.get_dataset_collection_instance(trans, "history", input_id)
elif input_source == "url":
data_request = DataRequestUri.model_validate(input_dict)
hda: HistoryDatasetAssociation = dereference_input(trans, data_request, history)
added_to_history = True
content = InputWithRequest(
input=hda,
request=data_request.model_dump(mode="json"),
)
if not data_request.deferred:
requires_materialization = True
else:
raise exceptions.RequestParameterInvalidException(
f"Unknown workflow input source '{input_source}' specified."
)
if add_to_history and content.history != history:
if not added_to_history and add_to_history and content.history != history:
if isinstance(content, HistoryDatasetCollectionAssociation):
content = content.copy(element_destination=history, flush=False)
else:
Expand Down Expand Up @@ -474,6 +500,7 @@ def build_workflow_run_configs(
allow_tool_state_corrections=allow_tool_state_corrections,
use_cached_job=use_cached_job,
resource_params=resource_params,
requires_materialization=requires_materialization,
preferred_object_store_id=preferred_object_store_id,
preferred_outputs_object_store_id=preferred_outputs_object_store_id,
preferred_intermediate_object_store_id=preferred_intermediate_object_store_id,
Expand Down
Loading

0 comments on commit 339bdfd

Please sign in to comment.