Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow CWL workflows to have jobs use all of a Slurm node's memory #5052

Merged
merged 39 commits into from
Oct 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
0fd0c52
Remove periods form option section headings
adamnovak Aug 8, 2024
b0c395f
Hide entire service section from CWL help
adamnovak Aug 8, 2024
aa1d275
Change Slurm memory allocation flag to new style, and document Slurm …
adamnovak Aug 8, 2024
7fc1f0c
Use but add a way to bypass the CWL spec default RAM, and rearrange o…
adamnovak Aug 8, 2024
1a397b3
Add --slurmDefaultAllMem to let jobs without their own memory require…
adamnovak Aug 8, 2024
768f227
Actually set new config entry
adamnovak Aug 8, 2024
ea1e3ea
Add a test for whole-node CWL memory on Slurm
adamnovak Aug 8, 2024
4b06802
Detect a job not getting a node as a passing test, and fix Slurm job …
adamnovak Aug 8, 2024
632c3b7
Merge master into issues/4971-slurm-node-memory
github-actions[bot] Aug 14, 2024
5d04bd1
Fix Slurm issue method typing
adamnovak Aug 14, 2024
ea52789
Merge master into issues/4971-slurm-node-memory
github-actions[bot] Aug 15, 2024
a6b2f75
Merge master into issues/4971-slurm-node-memory
github-actions[bot] Aug 19, 2024
2a6c3b1
Merge master into issues/4971-slurm-node-memory
github-actions[bot] Aug 19, 2024
104cf3c
Merge master into issues/4971-slurm-node-memory
github-actions[bot] Aug 21, 2024
2e71b7c
Merge master into issues/4971-slurm-node-memory
github-actions[bot] Aug 22, 2024
4960555
Merge master into issues/4971-slurm-node-memory
github-actions[bot] Aug 22, 2024
da79265
Merge master into issues/4971-slurm-node-memory
github-actions[bot] Aug 22, 2024
613df48
Merge master into issues/4971-slurm-node-memory
github-actions[bot] Aug 27, 2024
b8f410b
Merge master into issues/4971-slurm-node-memory
github-actions[bot] Aug 27, 2024
77dcc47
Merge master into issues/4971-slurm-node-memory
github-actions[bot] Sep 3, 2024
4ed2a07
Merge master into issues/4971-slurm-node-memory
github-actions[bot] Sep 10, 2024
ac5b298
Merge master into issues/4971-slurm-node-memory
github-actions[bot] Sep 10, 2024
97e98ce
Merge master into issues/4971-slurm-node-memory
github-actions[bot] Sep 12, 2024
9939309
Merge master into issues/4971-slurm-node-memory
github-actions[bot] Sep 16, 2024
93c2dbf
Merge branch 'master' into issues/4971-slurm-node-memory
adamnovak Sep 24, 2024
c132323
Merge branch 'master' into issues/4971-slurm-node-memory
adamnovak Sep 26, 2024
e6f42cf
Merge master into issues/4971-slurm-node-memory
github-actions[bot] Sep 30, 2024
286ea86
Merge master into issues/4971-slurm-node-memory
github-actions[bot] Oct 1, 2024
bbdd778
Merge master into issues/4971-slurm-node-memory
github-actions[bot] Oct 3, 2024
301566f
Merge master into issues/4971-slurm-node-memory
github-actions[bot] Oct 3, 2024
9cdd365
Merge master into issues/4971-slurm-node-memory
github-actions[bot] Oct 3, 2024
0034548
Merge master into issues/4971-slurm-node-memory
github-actions[bot] Oct 4, 2024
8d09bd3
Merge master into issues/4971-slurm-node-memory
github-actions[bot] Oct 4, 2024
b376f54
Merge master into issues/4971-slurm-node-memory
github-actions[bot] Oct 7, 2024
6f4c864
Fix docs and remove dead code from code review
adamnovak Oct 7, 2024
488f965
Merge master into issues/4971-slurm-node-memory
github-actions[bot] Oct 8, 2024
a634866
Merge master into issues/4971-slurm-node-memory
github-actions[bot] Oct 9, 2024
f59b898
Merge remote-tracking branch 'upstream/master' into issues/4971-slurm…
adamnovak Oct 11, 2024
813efe6
Drop duplicate import options
adamnovak Oct 11, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion docs/appendices/environment_vars.rst
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,9 @@ There are several environment variables that affect the way Toil runs.
| TOIL_GOOGLE_PROJECTID | The Google project ID to use when generating |
| | Google job store names for tests or CWL workflows. |
+----------------------------------+----------------------------------------------------+
| TOIL_SLURM_ALLOCATE_MEM | Whether to allocate memory in Slurm with --mem. |
| | True by default. |
+----------------------------------+----------------------------------------------------+
| TOIL_SLURM_ARGS | Arguments for sbatch for the slurm batch system. |
| | Do not pass CPU or memory specifications here. |
| | Instead, define resource requirements for the job. |
Expand All @@ -140,9 +143,15 @@ There are several environment variables that affect the way Toil runs.
| | provided. |
+----------------------------------+----------------------------------------------------+
| TOIL_SLURM_PE | Name of the slurm partition to use for parallel |
| | jobs. |
| | jobs. Useful for Slurm clusters that do not offer |
| | a partition accepting both single-core and |
| | multi-core jobs. |
| | There is no default value for this variable. |
+----------------------------------+----------------------------------------------------+
| TOIL_SLURM_TIME | Slurm job time limit, in [DD-]HH:MM:SS format. For |
| | example, ``2-07:15:30`` for 2 days, 7 hours, 15 |
| | minutes and 30 seconds, or ``4:00:00`` for 4 hours.|
+----------------------------------+----------------------------------------------------+
| TOIL_GRIDENGINE_ARGS | Arguments for qsub for the gridengine batch |
| | system. Do not pass CPU or memory specifications |
| | here. Instead, define resource requirements for |
Expand Down
14 changes: 11 additions & 3 deletions docs/cwl/running.rst
Original file line number Diff line number Diff line change
Expand Up @@ -91,9 +91,17 @@ printed to the stdout stream after workflow execution.
``--stats``: Save resources usages in json files that can be collected with the
``toil stats`` command after the workflow is done.

``--disable-streaming``: Does not allow streaming of input files. This is enabled
by default for files marked with ``streamable`` flag True and only for remote files
when the jobStore is not on local machine.
Extra Toil CWL Options
++++++++++++++++++++++

Besides the normal Toil options and the options supported by cwltool, toil-cwl-runner adds some of its own options:

--bypass-file-store Do not use Toil's file store system and assume all paths are accessible in place from all nodes. This can avoid possibly-redundant file copies into Toil's job store storage, and is required for CWL's ``InplaceUpdateRequirement``. But, it allows a failed job execution to leave behind a partially-modified state, which means that a restarted workflow might not work correctly.
--reference-inputs Do not copy remote inputs into Toil's file store and assume they are accessible in place from all nodes.
--disable-streaming Do not allow streaming of job input files. By default, files marked with ``streamable`` True are streamed from remote job stores.
--cwl-default-ram Apply CWL specification default ramMin.
--no-cwl-default-ram Do not apply CWL specification default ramMin, so that Toil --defaultMemory applies.


Running CWL in the Cloud
------------------------
Expand Down
22 changes: 17 additions & 5 deletions docs/running/cliOptions.rst
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,23 @@ levels in toil are based on priority from the logging module:
Use a Mesos role.
--mesosName MESOSNAME
The Mesos name to use. (default: toil)
--scale SCALE A scaling factor to change the value of all submitted
tasks' submitted cores. Used in single_machine batch
system. Useful for running workflows on smaller
machines than they were designed for, by setting a
value less than 1. (default: 1)
--slurmAllocateMem SLURM_ALLOCATE_MEM
If False, do not use --mem. Used as a workaround for
Slurm clusters that reject jobs with memory
allocations.
--slurmTime SLURM_TIME
Slurm job time limit, in [DD-]HH:MM:SS format.
--slurmPE SLURM_PE Special partition to send Slurm jobs to if they ask
for more than 1 CPU. Useful for Slurm clusters that do
not offer a partition accepting both single-core and
multi-core jobs.
--slurmArgs SLURM_ARGS
Extra arguments to pass to Slurm.
--kubernetesHostPath KUBERNETES_HOST_PATH
Path on Kubernetes hosts to use as shared inter-pod temp
directory.
Expand All @@ -261,11 +278,6 @@ levels in toil are based on priority from the logging module:
The ARN of an IAM role to run AWS Batch jobs as, so they
can e.g. access a job store. Must be assumable by
ecs-tasks.amazonaws.com
--scale SCALE A scaling factor to change the value of all submitted
tasks' submitted cores. Used in single_machine batch
system. Useful for running workflows on smaller
machines than they were designed for, by setting a
value less than 1. (default: 1)

**Data Storage Options**
Allows configuring Toil's data storage.
Expand Down
54 changes: 35 additions & 19 deletions src/toil/batchSystems/abstractGridEngineBatchSystem.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
UpdatedBatchJobInfo)
from toil.batchSystems.cleanup_support import BatchSystemCleanupSupport
from toil.bus import ExternalBatchIdMessage, get_job_kind
from toil.job import AcceleratorRequirement
from toil.job import JobDescription, AcceleratorRequirement
from toil.lib.misc import CalledProcessErrorStderr
from toil.lib.retry import old_retry, DEFAULT_DELAYS, retry

Expand Down Expand Up @@ -79,6 +79,7 @@ def __init__(self, newJobsQueue: Queue, updatedJobsQueue: Queue, killQueue: Queu
self.killedJobsQueue = killedJobsQueue
self.waitingJobs: List[JobTuple] = list()
self.runningJobs = set()
# TODO: Why do we need a lock for this? We have the GIL.
self.runningJobsLock = Lock()
self.batchJobIDs: Dict[int, str] = dict()
self._checkOnJobsCache = None
Expand Down Expand Up @@ -400,28 +401,36 @@ def __init__(self, config: Config, maxCores: float, maxMemory: int, maxDisk: int
def supportsAutoDeployment(cls):
return False

def issueBatchJob(self, command: str, jobDesc, job_environment: Optional[Dict[str, str]] = None):
def count_needed_gpus(self, job_desc: JobDescription):
"""
Count the number of cluster-allocateable GPUs we want to allocate for the given job.
"""
gpus = 0
if isinstance(job_desc.accelerators, list):
for accelerator in job_desc.accelerators:
if accelerator['kind'] == 'gpu':
gpus += accelerator['count']
else:
gpus = job_desc.accelerators

return gpus

def issueBatchJob(self, command: str, job_desc: JobDescription, job_environment: Optional[Dict[str, str]] = None):
# Avoid submitting internal jobs to the batch queue, handle locally
localID = self.handleLocalJob(command, jobDesc)
if localID is not None:
return localID
local_id = self.handleLocalJob(command, job_desc)
if local_id is not None:
return local_id
else:
self.check_resource_request(jobDesc)
jobID = self.getNextJobID()
self.currentJobs.add(jobID)
gpus = 0
if isinstance(jobDesc.accelerators, list):
for accelerator in jobDesc.accelerators:
if accelerator['kind'] == 'gpu':
gpus = accelerator['count']
else:
gpus = jobDesc.accelerators
self.check_resource_request(job_desc)
gpus = self.count_needed_gpus(job_desc)
job_id = self.getNextJobID()
self.currentJobs.add(job_id)

self.newJobsQueue.put((jobID, jobDesc.cores, jobDesc.memory, command, get_job_kind(jobDesc.get_names()),
self.newJobsQueue.put((job_id, job_desc.cores, job_desc.memory, command, get_job_kind(job_desc.get_names()),
job_environment, gpus))
logger.debug("Issued the job command: %s with job id: %s and job name %s", command, str(jobID),
get_job_kind(jobDesc.get_names()))
return jobID
logger.debug("Issued the job command: %s with job id: %s and job name %s", command, str(job_id),
get_job_kind(job_desc.get_names()))
return job_id

def killBatchJobs(self, jobIDs):
"""
Expand Down Expand Up @@ -511,6 +520,13 @@ def shutdown(self) -> None:
newJobsQueue.put(None)
self.background_thread.join()

# Now in one thread, kill all the jobs
if len(self.background_thread.runningJobs) > 0:
logger.warning("Cleaning up %s jobs still running at shutdown", len(self.background_thread.runningJobs))
for job in self.background_thread.runningJobs:
self.killQueue.put(job)
self.background_thread.killJobs()

def setEnv(self, name, value=None):
if value and ',' in value:
raise ValueError(type(self).__name__ + " does not support commata in environment variable values")
Expand Down
61 changes: 47 additions & 14 deletions src/toil/batchSystems/slurm.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,18 @@
import math
import os
import sys
from argparse import ArgumentParser, _ArgumentGroup
from argparse import ArgumentParser, _ArgumentGroup, SUPPRESS
from shlex import quote
from typing import Dict, List, Optional, Set, Tuple, TypeVar, Union, NamedTuple

from toil.bus import get_job_kind
from toil.common import Config
from toil.batchSystems.abstractBatchSystem import BatchJobExitReason, EXIT_STATUS_UNAVAILABLE_VALUE, InsufficientSystemResources
from toil.batchSystems.abstractGridEngineBatchSystem import \
AbstractGridEngineBatchSystem
from toil.batchSystems.options import OptionSetter
from toil.job import Requirer
from toil.job import JobDescription, Requirer
from toil.lib.conversions import strtobool
from toil.lib.misc import CalledProcessErrorStderr, call_command
from toil.statsAndLogging import TRACE

Expand Down Expand Up @@ -566,7 +568,7 @@ def prepareSbatch(self,
if cpu and cpu > 1 and parallel_env:
sbatch_line.append(f'--partition={parallel_env}')

if mem is not None and self.boss.config.allocate_mem: # type: ignore[attr-defined]
if mem is not None and self.boss.config.slurm_allocate_mem: # type: ignore[attr-defined]
# memory passed in is in bytes, but slurm expects megabytes
sbatch_line.append(f'--mem={math.ceil(mem / 2 ** 20)}')
if cpu is not None:
Expand Down Expand Up @@ -625,6 +627,33 @@ def __init__(self, config: Config, maxCores: float, maxMemory: int, maxDisk: int
super().__init__(config, maxCores, maxMemory, maxDisk)
self.partitions = SlurmBatchSystem.PartitionSet()

# Override issuing jobs so we can check if we need to use Slurm's magic
# whole-node-memory feature.
def issueBatchJob(self, command: str, job_desc: JobDescription, job_environment: Optional[Dict[str, str]] = None) -> int:
# Avoid submitting internal jobs to the batch queue, handle locally
local_id = self.handleLocalJob(command, job_desc)
if local_id is not None:
return local_id
else:
self.check_resource_request(job_desc)
gpus = self.count_needed_gpus(job_desc)
job_id = self.getNextJobID()
self.currentJobs.add(job_id)

if "memory" not in job_desc.requirements and self.config.slurm_default_all_mem: # type: ignore[attr-defined]
# The job doesn't have its own memory requirement, and we are
# defaulting to whole node memory. Use Slurm's 0-memory sentinel.
memory = 0
else:
# Use the memory actually on the job, or the Toil default memory
memory = job_desc.memory

self.newJobsQueue.put((job_id, job_desc.cores, memory, command, get_job_kind(job_desc.get_names()),
job_environment, gpus))
logger.debug("Issued the job command: %s with job id: %s and job name %s", command, str(job_id),
get_job_kind(job_desc.get_names()))
return job_id

def _check_accelerator_request(self, requirer: Requirer) -> None:
for accelerator in requirer.accelerators:
if accelerator['kind'] != 'gpu':
Expand All @@ -647,26 +676,30 @@ def _check_accelerator_request(self, requirer: Requirer) -> None:

@classmethod
def add_options(cls, parser: Union[ArgumentParser, _ArgumentGroup]) -> None:
allocate_mem = parser.add_mutually_exclusive_group()
allocate_mem_help = ("A flag that can block allocating memory with '--mem' for job submissions "
"on SLURM since some system servers may reject any job request that "
"explicitly specifies the memory allocation. The default is to always allocate memory.")
allocate_mem.add_argument("--dont_allocate_mem", action='store_false', dest="allocate_mem", help=allocate_mem_help)
allocate_mem.add_argument("--allocate_mem", action='store_true', dest="allocate_mem", help=allocate_mem_help)
allocate_mem.set_defaults(allocate_mem=True)

parser.add_argument("--slurmAllocateMem", dest="slurm_allocate_mem", type=strtobool, default=True, env_var="TOIL_SLURM_ALLOCATE_MEM",
help="If False, do not use --mem. Used as a workaround for Slurm clusters that reject jobs "
"with memory allocations.")
# Keep these deprcated options for backward compatibility
parser.add_argument("--dont_allocate_mem", action='store_false', dest="slurm_allocate_mem", help=SUPPRESS)
parser.add_argument("--allocate_mem", action='store_true', dest="slurm_allocate_mem", help=SUPPRESS)

parser.add_argument("--slurmDefaultAllMem", dest="slurm_default_all_mem", type=strtobool, default=False, env_var="TOIL_SLURM_DEFAULT_ALL_MEM",
help="If True, assign Toil jobs without their own memory requirements all available "
"memory on a Slurm node (via Slurm --mem=0).")
parser.add_argument("--slurmTime", dest="slurm_time", type=parse_slurm_time, default=None, env_var="TOIL_SLURM_TIME",
help="Slurm job time limit, in [DD-]HH:MM:SS format")
help="Slurm job time limit, in [DD-]HH:MM:SS format.")
parser.add_argument("--slurmPE", dest="slurm_pe", default=None, env_var="TOIL_SLURM_PE",
help="Special partition to send Slurm jobs to if they ask for more than 1 CPU")
help="Special partition to send Slurm jobs to if they ask for more than 1 CPU.")
parser.add_argument("--slurmArgs", dest="slurm_args", default="", env_var="TOIL_SLURM_ARGS",
help="Extra arguments to pass to Slurm")
help="Extra arguments to pass to Slurm.")


OptionType = TypeVar('OptionType')
@classmethod
def setOptions(cls, setOption: OptionSetter) -> None:
setOption("allocate_mem")
setOption("slurm_allocate_mem")
setOption("slurm_default_all_mem")
setOption("slurm_time")
setOption("slurm_pe")
setOption("slurm_args")
Expand Down
26 changes: 22 additions & 4 deletions src/toil/cwl/cwltoil.py
Original file line number Diff line number Diff line change
Expand Up @@ -801,7 +801,7 @@ def visit(
# wherever else we would stage it.
# TODO: why would we do that?
stagedir = cast(Optional[str], obj.get("dirname")) or stagedir

if obj["class"] not in ("File", "Directory"):
# We only handle files and directories; only they have locations.
return
Expand Down Expand Up @@ -990,7 +990,7 @@ def visit(
# reference, we just pass that along.

"""Link or copy files to their targets. Create them as needed."""

logger.debug(
"ToilPathMapper adding file mapping %s -> %s", deref, tgt
)
Expand Down Expand Up @@ -2473,6 +2473,20 @@ def __init__(

req = tool.evalResources(self.builder, runtime_context)

tool_own_resources = tool.get_requirement("ResourceRequirement")[0] or {}
if "ramMin" in tool_own_resources or "ramMax" in tool_own_resources:
# The tool is actually asking for memory.
memory = int(req["ram"] * (2**20))
else:
# The tool is getting a default ram allocation.
if getattr(runtime_context, "cwl_default_ram"):
# We will respect the CWL spec and apply the default cwltool
# computed, which might be different than Toil's default.
memory = int(req["ram"] * (2**20))
else:
# We use a None requirement and the Toil default applies.
memory = None

accelerators: Optional[List[AcceleratorRequirement]] = None
if req.get("cudaDeviceCount", 0) > 0:
# There's a CUDARequirement, which cwltool processed for us
Expand Down Expand Up @@ -2537,7 +2551,7 @@ def __init__(

super().__init__(
cores=req["cores"],
memory=int(req["ram"] * (2**20)),
memory=memory,
disk=int(total_disk),
accelerators=accelerators,
preemptible=preemptible,
Expand Down Expand Up @@ -3837,6 +3851,7 @@ def main(args: Optional[List[str]] = None, stdout: TextIO = sys.stdout) -> int:
)
runtime_context.workdir = workdir # type: ignore[attr-defined]
runtime_context.outdir = outdir
setattr(runtime_context, "cwl_default_ram", options.cwl_default_ram)
runtime_context.move_outputs = "leave"
runtime_context.rm_tmpdir = False
runtime_context.streaming_allowed = not options.disable_streaming
Expand Down Expand Up @@ -3875,11 +3890,14 @@ def main(args: Optional[List[str]] = None, stdout: TextIO = sys.stdout) -> int:
if options.restart:
outobj = toil.restart()
else:
# TODO: why are we doing this? Does this get applied to all
# tools as a default or something?
loading_context.hints = [
{
"class": "ResourceRequirement",
"coresMin": toil.config.defaultCores,
"ramMin": toil.config.defaultMemory / (2**20),
# Don't include any RAM requirement because we want to
# know when tools don't manually ask for RAM.
"outdirMin": toil.config.defaultDisk / (2**20),
"tmpdirMin": 0,
}
Expand Down
1 change: 0 additions & 1 deletion src/toil/leader.py
Original file line number Diff line number Diff line change
Expand Up @@ -1067,7 +1067,6 @@ def killJobs(self, jobsToKill, exit_reason: BatchJobExitReason = BatchJobExitRea

return jobsRerunning


#Following functions handle error cases for when jobs have gone awry with the batch system.

def reissueOverLongJobs(self) -> None:
Expand Down
Loading
Loading