Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support importing on workers in WDL #5103

Merged
merged 22 commits into from
Oct 9, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
33fa295
mirror import on workers update to wdl
stxue1 Sep 26, 2024
9a6612a
Merge master into issues/5025-import-on-workers-wdl
github-actions[bot] Sep 26, 2024
e90bfbe
Merge master into issues/5025-import-on-workers-wdl
github-actions[bot] Sep 26, 2024
a46a001
Apply suggestions from code review
stxue1 Sep 27, 2024
c55330c
Merge branch 'master' of github.com:DataBiosphere/toil into issues/50…
stxue1 Sep 27, 2024
413e3ea
Rename WDLRootJob, remove kebab case from WDL, and raise an error if …
stxue1 Sep 27, 2024
0fcdf35
Move out shared runner options
stxue1 Sep 28, 2024
d615a4e
Merge master into issues/5025-import-on-workers-wdl
github-actions[bot] Sep 30, 2024
361551d
Add runner.py
stxue1 Sep 30, 2024
10f02f2
Merge branch 'issues/5025-import-on-workers-wdl' of github.com:DataBi…
stxue1 Sep 30, 2024
c6b4079
Merge branch 'master' of github.com:DataBiosphere/toil into issues/50…
stxue1 Oct 2, 2024
9329327
Add logDebug to giraffe workflow
stxue1 Oct 3, 2024
881ab9a
Add toil.import_file logic into the wdl import function
stxue1 Oct 3, 2024
1bc9b3e
Merge master into issues/5025-import-on-workers-wdl
github-actions[bot] Oct 3, 2024
b6f2ab0
Merge master into issues/5025-import-on-workers-wdl
github-actions[bot] Oct 3, 2024
b016723
Merge master into issues/5025-import-on-workers-wdl
github-actions[bot] Oct 3, 2024
f626dea
Merge master into issues/5025-import-on-workers-wdl
github-actions[bot] Oct 4, 2024
a1ebbf4
Merge master into issues/5025-import-on-workers-wdl
github-actions[bot] Oct 4, 2024
7b2e167
Merge master into issues/5025-import-on-workers-wdl
github-actions[bot] Oct 7, 2024
3ba0391
Reconnect test command argument to its option
adamnovak Oct 7, 2024
0a17f9c
Merge master into issues/5025-import-on-workers-wdl
github-actions[bot] Oct 8, 2024
3f8c8ca
Merge master into issues/5025-import-on-workers-wdl
github-actions[bot] Oct 9, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions src/toil/options/wdl.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from argparse import ArgumentParser

from configargparse import SUPPRESS
from toil.lib.conversions import human2bytes


def add_wdl_options(parser: ArgumentParser, suppress: bool = True) -> None:
Expand Down Expand Up @@ -35,3 +36,18 @@ def add_wdl_options(parser: ArgumentParser, suppress: bool = True) -> None:
container_arguments = ["--wdlContainer"] + (["--container"] if not suppress else [])
parser.add_argument(*container_arguments, dest="container", type=str, choices=["singularity", "docker", "auto"], default="auto",
help=suppress_help or "Container engine to use to run WDL tasks")

parser.add_argument(
"--runImportsOnWorkers", "--run-imports-on-workers",
action="store_true", default=False,
help=suppress_help or "Run the file imports on a worker instead of the leader. This is useful if the leader is not optimized for high network performance."
"If set to true, the argument --importWorkersDisk must also be set.",
dest="run_imports_on_workers"
)

parser.add_argument("--importWorkersDisk", "--import-workers-disk",
help=suppress_help or "Specify the amount of disk space an import worker will use. If file streaming for input files is not available,"
"this should be set to the size of the largest input file. This must be set in conjunction with the argument runImportsOnWorkers.",
dest="import_workers_disk",
type=lambda x: human2bytes(str(x)),
default=None)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think any of the other WDL-side options have the --kebab-case form.

Also, is it possible to share the option definitions for these between CWL and WDL?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the function signatures are different, I think it's easier to keep them separate than figuring out a way to get around argparse, unless we remove kebab case from the CWL equivalent of these options, which I originally included as all other CWL options are kebab case

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I can actually move it out into a separate function

68 changes: 55 additions & 13 deletions src/toil/wdl/wdltoil.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@

import WDL.Error
import WDL.runtime.config
from configargparse import ArgParser
from configargparse import ArgParser, Namespace
from WDL._util import byte_size_units
from WDL.Tree import ReadSourceResult
from WDL.CLI import print_error
Expand All @@ -74,7 +74,7 @@
TemporaryID,
parse_accelerator,
unwrap,
unwrap_all)
unwrap_all, ParseableIndivisibleResource)
stxue1 marked this conversation as resolved.
Show resolved Hide resolved
from toil.jobStores.abstractJobStore import (AbstractJobStore, UnimplementedURLException,
InvalidImportExportUrlException, LocatorException)
from toil.lib.accelerators import get_individual_local_accelerators
Expand Down Expand Up @@ -332,7 +332,7 @@ def first_mismatch(prefix: str, value: str) -> int:
return modified


def potential_absolute_uris(uri: str, path: List[str], importer: Optional[WDL.Tree.Document] = None) -> Iterator[str]:
def potential_absolute_uris(uri: str, path: List[str], importer: Optional[WDL.Tree.Document] = None, execution_dir: Optional[str] = None) -> Iterator[str]:
"""
Get potential absolute URIs to check for an imported file.

Expand Down Expand Up @@ -374,7 +374,8 @@ def potential_absolute_uris(uri: str, path: List[str], importer: Optional[WDL.Tr
full_path_list.append(Toil.normalize_uri(importer.pos.abspath))

# Then the current directory. We need to make sure to include a filename component here or it will treat the current directory with no trailing / as a document and relative paths will look 1 level up.
full_path_list.append(Toil.normalize_uri('.') + '/.')
# When importing on a worker, the cwd will be a tmpdir and will result in FileNotFoundError after os.path.abspath, so override with the execution dir
full_path_list.append(Toil.normalize_uri(execution_dir or '.') + '/.')

# Then the specified paths.
# TODO:
Expand Down Expand Up @@ -1449,7 +1450,8 @@ def add_paths(task_container: TaskContainer, host_paths: Iterable[str]) -> None:
task_container.input_path_map[host_path] = container_path
task_container.input_path_map_rev[container_path] = host_path

def import_files(environment: WDLBindings, task_path: str, toil: Toil, path: Optional[List[str]] = None, skip_remote: bool = False) -> WDLBindings:
def import_files(environment: WDLBindings, task_path: str, jobstore: AbstractJobStore, path: Optional[List[str]] = None, skip_remote: bool = False,
execution_dir: Optional[str] = None) -> WDLBindings:
"""
Make sure all File values embedded in the given bindings are imported,
using the given Toil object.
Expand All @@ -1471,7 +1473,7 @@ def import_file_from_uri(uri: str) -> str:
"""

tried = []
for candidate_uri in potential_absolute_uris(uri, path if path is not None else []):
for candidate_uri in potential_absolute_uris(uri, path if path is not None else [], execution_dir=execution_dir):
# Try each place it could be according to WDL finding logic.
tried.append(candidate_uri)
try:
Expand All @@ -1487,7 +1489,7 @@ def import_file_from_uri(uri: str) -> str:
# Actually import
# Try to import the file. Don't raise if we can't find it, just
# return None!
imported = toil.import_file(candidate_uri, check_existence=False)
imported = jobstore.import_file(candidate_uri)
if imported is None:
# Wasn't found there
continue
Expand Down Expand Up @@ -3454,6 +3456,32 @@ def run(self, file_store: AbstractFileStore) -> Promised[WDLBindings]:
self.defer_postprocessing(job)
return job.rv()

# @report_wdl_errors("run wdl import on worker", exit=True)
stxue1 marked this conversation as resolved.
Show resolved Hide resolved
class WDLImportJob(WDLSectionJob):
def __init__(self, target: Union[WDL.Tree.Workflow, WDL.Tree.Task], inputs: WDLBindings, path: Optional[List[str]] = None, skip_remote: bool = False,
disk_size: Optional[ParseableIndivisibleResource] = None, wdl_options: Optional[Dict[str, str]] = None, **kwargs: Any):
"""
Job to take the inputs from the WDL workflow and import them on a worker instead of a leader. Assumes all local and cloud files are accessible.

This class is only used when runImportsOnWorkers is enabled.
"""
super().__init__(target.name, target.name, wdl_options=wdl_options, local=False,
disk=disk_size, **kwargs)
self._target = target
self._inputs = inputs
self._path = path
self._skip_remote = skip_remote

def run(self, file_store: AbstractFileStore) -> Promised[WDLBindings]:
"""
Import the workflow inputs and then create and run the workflow.
:return: Promise of workflow outputs
"""
input_bindings = import_files(self._inputs, self._target.name, file_store.jobStore, self._path, self._skip_remote, self._wdl_options.get("execution_dir"))
root_job = WDLRootJob(self._target, input_bindings, wdl_options=self._wdl_options)
self.addChild(root_job)
return root_job.rv()


@contextmanager
def monkeypatch_coerce(standard_library: ToilWDLStdLibBase) -> Generator[None, None, None]:
Expand Down Expand Up @@ -3497,6 +3525,20 @@ def coerce(self: SelfType, desired_type: Optional[WDL.Type.Base] = None) -> WDL.
WDL.Value.Base.coerce = old_base_coerce # type: ignore[method-assign]
WDL.Value.String.coerce = old_str_coerce # type: ignore[method-assign]


def make_root_job(target: Union[WDL.Tree.Workflow, WDL.Tree.Task], inputs: WDLBindings, inputs_search_path: List[str], toil: Toil, wdl_options: Optional[Dict[str, str]], options: Namespace) -> WDLSectionJob:
if options.run_imports_on_workers:
# Run WDL imports on a worker instead
root_job: WDLSectionJob = WDLImportJob(target, inputs, inputs_search_path, skip_remote=options.reference_inputs, disk_size=options.import_workers_disk, wdl_options=wdl_options)
else:
# Run WDL imports on leader
# Import any files in the bindings
input_bindings = import_files(inputs, target.name, toil._jobStore, inputs_search_path, skip_remote=options.reference_inputs)
# Run the workflow and get its outputs namespaced with the workflow name.
root_job = WDLRootJob(target, input_bindings, wdl_options=wdl_options)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We probably need to rename WDLRootJob since it isn't always the root anymore.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Renamed to WDLStartJob

return root_job


@report_wdl_errors("run workflow", exit=True)
def main() -> None:
"""
Expand All @@ -3514,6 +3556,11 @@ def main() -> None:
# TODO: Move cwltoil's generate_default_job_store where we can use it
options.jobStore = os.path.join(mkdtemp(), 'tree')

# Take care of incompatible arguments related to file imports
if options.run_imports_on_workers is True and options.import_workers_disk is None:
logger.error("Commandline arguments --runImportsOnWorkers and --importWorkersDisk must both be set to run file imports on workers.")
return
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We probably need to raise an error or exit with a nonzero exit code, or the workflow will look like it succeeded.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed to a RuntimeError


# Make sure we have an output directory (or URL prefix) and we don't need
# to ever worry about a None, and MyPy knows it.
# If we don't have a directory assigned, make one in the current directory.
Expand Down Expand Up @@ -3586,9 +3633,6 @@ def main() -> None:
logger.info("Inputs appear to come from a Github repository; adding repository root to file search path")
inputs_search_path.append(match.group(0))

# Import any files in the bindings
input_bindings = import_files(input_bindings, target.name, toil, inputs_search_path, skip_remote=options.reference_inputs)

# TODO: Automatically set a good MINIWDL__SINGULARITY__IMAGE_CACHE ?

# Get the execution directory
Expand All @@ -3599,9 +3643,7 @@ def main() -> None:
wdl_options["execution_dir"] = execution_dir
wdl_options["container"] = options.container
assert wdl_options.get("container") is not None

# Run the workflow and get its outputs namespaced with the workflow name.
root_job = WDLRootJob(target, input_bindings, wdl_options=wdl_options)
root_job = make_root_job(target, input_bindings, inputs_search_path, toil, wdl_options, options)
output_bindings = toil.start(root_job)
if not isinstance(output_bindings, WDL.Env.Bindings):
raise RuntimeError("The output of the WDL job is not a binding.")
Expand Down