-
Notifications
You must be signed in to change notification settings - Fork 240
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Support importing on workers in WDL #5103
Changes from 2 commits
33fa295
9a6612a
e90bfbe
a46a001
c55330c
413e3ea
0fcdf35
d615a4e
361551d
10f02f2
c6b4079
9329327
881ab9a
1bc9b3e
b6f2ab0
b016723
f626dea
a1ebbf4
7b2e167
3ba0391
0a17f9c
3f8c8ca
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -54,7 +54,7 @@ | |
|
||
import WDL.Error | ||
import WDL.runtime.config | ||
from configargparse import ArgParser | ||
from configargparse import ArgParser, Namespace | ||
from WDL._util import byte_size_units | ||
from WDL.Tree import ReadSourceResult | ||
from WDL.CLI import print_error | ||
|
@@ -74,7 +74,7 @@ | |
TemporaryID, | ||
parse_accelerator, | ||
unwrap, | ||
unwrap_all) | ||
unwrap_all, ParseableIndivisibleResource) | ||
stxue1 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
from toil.jobStores.abstractJobStore import (AbstractJobStore, UnimplementedURLException, | ||
InvalidImportExportUrlException, LocatorException) | ||
from toil.lib.accelerators import get_individual_local_accelerators | ||
|
@@ -332,7 +332,7 @@ def first_mismatch(prefix: str, value: str) -> int: | |
return modified | ||
|
||
|
||
def potential_absolute_uris(uri: str, path: List[str], importer: Optional[WDL.Tree.Document] = None) -> Iterator[str]: | ||
def potential_absolute_uris(uri: str, path: List[str], importer: Optional[WDL.Tree.Document] = None, execution_dir: Optional[str] = None) -> Iterator[str]: | ||
""" | ||
Get potential absolute URIs to check for an imported file. | ||
|
||
|
@@ -374,7 +374,8 @@ def potential_absolute_uris(uri: str, path: List[str], importer: Optional[WDL.Tr | |
full_path_list.append(Toil.normalize_uri(importer.pos.abspath)) | ||
|
||
# Then the current directory. We need to make sure to include a filename component here or it will treat the current directory with no trailing / as a document and relative paths will look 1 level up. | ||
full_path_list.append(Toil.normalize_uri('.') + '/.') | ||
# When importing on a worker, the cwd will be a tmpdir and will result in FileNotFoundError after os.path.abspath, so override with the execution dir | ||
full_path_list.append(Toil.normalize_uri(execution_dir or '.') + '/.') | ||
|
||
# Then the specified paths. | ||
# TODO: | ||
|
@@ -1449,7 +1450,8 @@ def add_paths(task_container: TaskContainer, host_paths: Iterable[str]) -> None: | |
task_container.input_path_map[host_path] = container_path | ||
task_container.input_path_map_rev[container_path] = host_path | ||
|
||
def import_files(environment: WDLBindings, task_path: str, toil: Toil, path: Optional[List[str]] = None, skip_remote: bool = False) -> WDLBindings: | ||
def import_files(environment: WDLBindings, task_path: str, jobstore: AbstractJobStore, path: Optional[List[str]] = None, skip_remote: bool = False, | ||
execution_dir: Optional[str] = None) -> WDLBindings: | ||
""" | ||
Make sure all File values embedded in the given bindings are imported, | ||
using the given Toil object. | ||
|
@@ -1471,7 +1473,7 @@ def import_file_from_uri(uri: str) -> str: | |
""" | ||
|
||
tried = [] | ||
for candidate_uri in potential_absolute_uris(uri, path if path is not None else []): | ||
for candidate_uri in potential_absolute_uris(uri, path if path is not None else [], execution_dir=execution_dir): | ||
# Try each place it could be according to WDL finding logic. | ||
tried.append(candidate_uri) | ||
try: | ||
|
@@ -1487,7 +1489,7 @@ def import_file_from_uri(uri: str) -> str: | |
# Actually import | ||
# Try to import the file. Don't raise if we can't find it, just | ||
# return None! | ||
imported = toil.import_file(candidate_uri, check_existence=False) | ||
imported = jobstore.import_file(candidate_uri) | ||
if imported is None: | ||
# Wasn't found there | ||
continue | ||
|
@@ -3454,6 +3456,32 @@ def run(self, file_store: AbstractFileStore) -> Promised[WDLBindings]: | |
self.defer_postprocessing(job) | ||
return job.rv() | ||
|
||
# @report_wdl_errors("run wdl import on worker", exit=True) | ||
stxue1 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
class WDLImportJob(WDLSectionJob): | ||
def __init__(self, target: Union[WDL.Tree.Workflow, WDL.Tree.Task], inputs: WDLBindings, path: Optional[List[str]] = None, skip_remote: bool = False, | ||
disk_size: Optional[ParseableIndivisibleResource] = None, wdl_options: Optional[Dict[str, str]] = None, **kwargs: Any): | ||
""" | ||
Job to take the inputs from the WDL workflow and import them on a worker instead of a leader. Assumes all local and cloud files are accessible. | ||
|
||
This class is only used when runImportsOnWorkers is enabled. | ||
""" | ||
super().__init__(target.name, target.name, wdl_options=wdl_options, local=False, | ||
disk=disk_size, **kwargs) | ||
self._target = target | ||
self._inputs = inputs | ||
self._path = path | ||
self._skip_remote = skip_remote | ||
|
||
def run(self, file_store: AbstractFileStore) -> Promised[WDLBindings]: | ||
""" | ||
Import the workflow inputs and then create and run the workflow. | ||
:return: Promise of workflow outputs | ||
""" | ||
input_bindings = import_files(self._inputs, self._target.name, file_store.jobStore, self._path, self._skip_remote, self._wdl_options.get("execution_dir")) | ||
root_job = WDLRootJob(self._target, input_bindings, wdl_options=self._wdl_options) | ||
self.addChild(root_job) | ||
return root_job.rv() | ||
|
||
|
||
@contextmanager | ||
def monkeypatch_coerce(standard_library: ToilWDLStdLibBase) -> Generator[None, None, None]: | ||
|
@@ -3497,6 +3525,20 @@ def coerce(self: SelfType, desired_type: Optional[WDL.Type.Base] = None) -> WDL. | |
WDL.Value.Base.coerce = old_base_coerce # type: ignore[method-assign] | ||
WDL.Value.String.coerce = old_str_coerce # type: ignore[method-assign] | ||
|
||
|
||
def make_root_job(target: Union[WDL.Tree.Workflow, WDL.Tree.Task], inputs: WDLBindings, inputs_search_path: List[str], toil: Toil, wdl_options: Optional[Dict[str, str]], options: Namespace) -> WDLSectionJob: | ||
if options.run_imports_on_workers: | ||
# Run WDL imports on a worker instead | ||
root_job: WDLSectionJob = WDLImportJob(target, inputs, inputs_search_path, skip_remote=options.reference_inputs, disk_size=options.import_workers_disk, wdl_options=wdl_options) | ||
else: | ||
# Run WDL imports on leader | ||
# Import any files in the bindings | ||
input_bindings = import_files(inputs, target.name, toil._jobStore, inputs_search_path, skip_remote=options.reference_inputs) | ||
# Run the workflow and get its outputs namespaced with the workflow name. | ||
root_job = WDLRootJob(target, input_bindings, wdl_options=wdl_options) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We probably need to rename There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Renamed to |
||
return root_job | ||
|
||
|
||
@report_wdl_errors("run workflow", exit=True) | ||
def main() -> None: | ||
""" | ||
|
@@ -3514,6 +3556,11 @@ def main() -> None: | |
# TODO: Move cwltoil's generate_default_job_store where we can use it | ||
options.jobStore = os.path.join(mkdtemp(), 'tree') | ||
|
||
# Take care of incompatible arguments related to file imports | ||
if options.run_imports_on_workers is True and options.import_workers_disk is None: | ||
logger.error("Commandline arguments --runImportsOnWorkers and --importWorkersDisk must both be set to run file imports on workers.") | ||
return | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We probably need to raise an error or exit with a nonzero exit code, or the workflow will look like it succeeded. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Changed to a RuntimeError |
||
|
||
# Make sure we have an output directory (or URL prefix) and we don't need | ||
# to ever worry about a None, and MyPy knows it. | ||
# If we don't have a directory assigned, make one in the current directory. | ||
|
@@ -3586,9 +3633,6 @@ def main() -> None: | |
logger.info("Inputs appear to come from a Github repository; adding repository root to file search path") | ||
inputs_search_path.append(match.group(0)) | ||
|
||
# Import any files in the bindings | ||
input_bindings = import_files(input_bindings, target.name, toil, inputs_search_path, skip_remote=options.reference_inputs) | ||
|
||
# TODO: Automatically set a good MINIWDL__SINGULARITY__IMAGE_CACHE ? | ||
|
||
# Get the execution directory | ||
|
@@ -3599,9 +3643,7 @@ def main() -> None: | |
wdl_options["execution_dir"] = execution_dir | ||
wdl_options["container"] = options.container | ||
assert wdl_options.get("container") is not None | ||
|
||
# Run the workflow and get its outputs namespaced with the workflow name. | ||
root_job = WDLRootJob(target, input_bindings, wdl_options=wdl_options) | ||
root_job = make_root_job(target, input_bindings, inputs_search_path, toil, wdl_options, options) | ||
output_bindings = toil.start(root_job) | ||
if not isinstance(output_bindings, WDL.Env.Bindings): | ||
raise RuntimeError("The output of the WDL job is not a binding.") | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think any of the other WDL-side options have the
--kebab-case
form.Also, is it possible to share the option definitions for these between CWL and WDL?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the function signatures are different, I think it's easier to keep them separate than figuring out a way to get around argparse, unless we remove kebab case from the CWL equivalent of these options, which I originally included as all other CWL options are kebab case
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think I can actually move it out into a separate function